Return-Path: Delivered-To: apmail-hadoop-hbase-commits-archive@minotaur.apache.org Received: (qmail 51026 invoked from network); 18 Mar 2010 23:52:17 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 18 Mar 2010 23:52:17 -0000 Received: (qmail 47142 invoked by uid 500); 18 Mar 2010 23:52:17 -0000 Delivered-To: apmail-hadoop-hbase-commits-archive@hadoop.apache.org Received: (qmail 47095 invoked by uid 500); 18 Mar 2010 23:52:17 -0000 Mailing-List: contact hbase-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hbase-dev@hadoop.apache.org Delivered-To: mailing list hbase-commits@hadoop.apache.org Received: (qmail 47082 invoked by uid 99); 18 Mar 2010 23:52:17 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Mar 2010 23:52:17 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Mar 2010 23:52:11 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id D5AA023888CE; Thu, 18 Mar 2010 23:51:49 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r925047 - in /hadoop/hbase/branches/0.20: ./ src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/ src/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/ src/java/org/apache/h... Date: Thu, 18 Mar 2010 23:51:49 -0000 To: hbase-commits@hadoop.apache.org From: stack@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100318235149.D5AA023888CE@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: stack Date: Thu Mar 18 23:51:49 2010 New Revision: 925047 URL: http://svn.apache.org/viewvc?rev=925047&view=rev Log: HBASE-2283 row level atomicity Modified: hadoop/hbase/branches/0.20/CHANGES.txt hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLog.java hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLogRecoveryManager.java hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java hadoop/hbase/branches/0.20/src/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTHLog.java hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/KeyValue.java hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HLog.java hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/Store.java hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java Modified: hadoop/hbase/branches/0.20/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/CHANGES.txt?rev=925047&r1=925046&r2=925047&view=diff ============================================================================== --- hadoop/hbase/branches/0.20/CHANGES.txt (original) +++ hadoop/hbase/branches/0.20/CHANGES.txt Thu Mar 18 23:51:49 2010 @@ -45,6 +45,7 @@ Release 0.20.4 - Unreleased HBASE-2305 Client port for ZK has no default (Suraj Varma via Stack) HBASE-2323 filter.RegexStringComparator does not work with certain bytes (Benoit Sigoure via Stack) + HBASE-2283 row level atomicity (Kannan Muthukkaruppan via Stack) IMPROVEMENTS HBASE-2180 Bad read performance from synchronizing hfile.fddatainputstream Modified: hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLog.java URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLog.java?rev=925047&r1=925046&r2=925047&view=diff ============================================================================== --- hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLog.java (original) +++ hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLog.java Thu Mar 18 23:51:49 2010 @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.client.Pu import org.apache.hadoop.hbase.regionserver.HLog; import org.apache.hadoop.hbase.regionserver.HLogKey; import org.apache.hadoop.hbase.regionserver.LogRollListener; +import org.apache.hadoop.hbase.regionserver.WALEdit; import org.apache.hadoop.io.SequenceFile; /** @@ -49,7 +50,7 @@ class THLog extends HLog { @Override protected SequenceFile.Writer createWriter(Path path) throws IOException { - return super.createWriter(path, THLogKey.class, KeyValue.class); + return super.createWriter(path, THLogKey.class, WALEdit.class); } @Override @@ -90,16 +91,17 @@ class THLog extends HLog { */ public void append(HRegionInfo regionInfo, long now, THLogKey.TrxOp txOp, long transactionId) throws IOException { - THLogKey key = new THLogKey(regionInfo.getRegionName(), regionInfo - .getTableDesc().getName(), -1, now, txOp, transactionId); - super.append(regionInfo, key, new KeyValue(new byte [0], 0, 0)); // Empty KeyValue + THLogKey key = new THLogKey(regionInfo.getRegionName(), + regionInfo.getTableDesc().getName(), -1, now, txOp, transactionId); + WALEdit e = new WALEdit(); + e.add(new KeyValue(new byte [0], 0, 0)); // Empty KeyValue + super.append(regionInfo, key, e, regionInfo.isMetaRegion()); } /** * Write a transactional update to the log. * * @param regionInfo - * @param now * @param update * @param transactionId * @throws IOException @@ -114,7 +116,9 @@ class THLog extends HLog { transactionId); for (KeyValue value : convertToKeyValues(update)) { - super.append(regionInfo, key, value); + WALEdit e = new WALEdit(); + e.add(value); + super.append(regionInfo, key, e, regionInfo.isMetaRegion()); } } @@ -122,8 +126,7 @@ class THLog extends HLog { * Write a transactional delete to the log. * * @param regionInfo - * @param now - * @param update + * @param delete * @param transactionId * @throws IOException */ @@ -137,7 +140,9 @@ class THLog extends HLog { transactionId); for (KeyValue value : convertToKeyValues(delete)) { - super.append(regionInfo, key, value); + WALEdit e = new WALEdit(); + e.add(value); + super.append(regionInfo, key, e, regionInfo.isMetaRegion()); } } Modified: hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLogRecoveryManager.java URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLogRecoveryManager.java?rev=925047&r1=925046&r2=925047&view=diff ============================================================================== --- hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLogRecoveryManager.java (original) +++ hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLogRecoveryManager.java Thu Mar 18 23:51:49 2010 @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.HRegionIn import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.transactional.HBaseBackedTransactionLogger; import org.apache.hadoop.hbase.client.transactional.TransactionLogger; +import org.apache.hadoop.hbase.regionserver.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.util.Progressable; @@ -86,7 +87,7 @@ class THLogRecoveryManager { * @throws UnsupportedEncodingException * @throws IOException */ - public Map> getCommitsFromLog( + public Map> getCommitsFromLog( final Path reconstructionLog, final long maxSeqID, final Progressable reporter) throws UnsupportedEncodingException, IOException { @@ -102,7 +103,8 @@ class THLogRecoveryManager { return null; } - SortedMap> pendingTransactionsById = new TreeMap>(); + SortedMap> pendingTransactionsById = + new TreeMap>(); Set commitedTransactions = new HashSet(); Set abortedTransactions = new HashSet(); @@ -111,7 +113,7 @@ class THLogRecoveryManager { try { THLogKey key = new THLogKey(); - KeyValue val = new KeyValue(); + WALEdit val = new WALEdit(); long skippedEdits = 0; long totalEdits = 0; long startCount = 0; @@ -119,6 +121,9 @@ class THLogRecoveryManager { long abortCount = 0; long commitCount = 0; // How many edits to apply before we send a progress report. + + + int reportInterval = conf.getInt("hbase.hstore.report.interval.edits", 2000); @@ -137,18 +142,18 @@ class THLogRecoveryManager { } long transactionId = key.getTransactionId(); - List updates = pendingTransactionsById.get(transactionId); + List updates = pendingTransactionsById.get(transactionId); switch (key.getTrxOp()) { case OP: if (updates == null) { - updates = new ArrayList(); + updates = new ArrayList(); pendingTransactionsById.put(transactionId, updates); startCount++; } updates.add(val); - val = new KeyValue(); + val = new WALEdit(); writeCount++; break; @@ -210,15 +215,16 @@ class THLogRecoveryManager { return null; } - private SortedMap> resolvePendingTransaction( - SortedMap> pendingTransactionsById + private SortedMap> resolvePendingTransaction( + SortedMap> pendingTransactionsById ) { - SortedMap> commitedTransactionsById = new TreeMap>(); + SortedMap> commitedTransactionsById = + new TreeMap>(); LOG.info("Region log has " + pendingTransactionsById.size() + " unfinished transactions. Going to the transaction log to resolve"); - for (Entry> entry : pendingTransactionsById.entrySet()) { + for (Entry> entry : pendingTransactionsById.entrySet()) { if (entry.getValue().isEmpty()) { LOG.debug("Skipping resolving trx ["+entry.getKey()+"] has no writes."); } Modified: hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java?rev=925047&r1=925046&r2=925047&view=diff ============================================================================== --- hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java (original) +++ hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java Thu Mar 18 23:51:49 2010 @@ -54,12 +54,7 @@ import org.apache.hadoop.hbase.client.Sc import org.apache.hadoop.hbase.client.transactional.HBaseBackedTransactionLogger; import org.apache.hadoop.hbase.client.transactional.UnknownTransactionException; import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface; -import org.apache.hadoop.hbase.regionserver.FlushRequester; -import org.apache.hadoop.hbase.regionserver.HLog; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.InternalScanner; -import org.apache.hadoop.hbase.regionserver.KeyValueScanner; -import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.*; import org.apache.hadoop.hbase.regionserver.transactional.TransactionState.Status; import org.apache.hadoop.util.Progressable; @@ -142,20 +137,23 @@ public class TransactionalRegion extends } THLogRecoveryManager recoveryManager = new THLogRecoveryManager(this); - Map> commitedTransactionsById = recoveryManager + Map> commitedTransactionsById = recoveryManager .getCommitsFromLog(oldLogFile, minSeqId, reporter); if (commitedTransactionsById != null && commitedTransactionsById.size() > 0) { LOG.debug("found " + commitedTransactionsById.size() + " COMMITED transactions to recover."); - for (Entry> entry : commitedTransactionsById + for (Entry> entry : commitedTransactionsById .entrySet()) { LOG.debug("Writing " + entry.getValue().size() + " updates for transaction " + entry.getKey()); - for (KeyValue b : entry.getValue()) { - Put put = new Put(b.getRow()); - put.add(b); + for (WALEdit b : entry.getValue()) { + Put put = null; + for (KeyValue kv: b.getKeyValues()) { + if (put == null) put = new Put(kv.getRow()); + put.add(kv); + } super.put(put, true); // These are walled so they live forever } } Modified: hadoop/hbase/branches/0.20/src/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTHLog.java URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTHLog.java?rev=925047&r1=925046&r2=925047&view=diff ============================================================================== --- hadoop/hbase/branches/0.20/src/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTHLog.java (original) +++ hadoop/hbase/branches/0.20/src/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTHLog.java Thu Mar 18 23:51:49 2010 @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.HRegionIn import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.regionserver.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -100,7 +101,7 @@ public class TestTHLog extends HBaseTest log.close(); Path filename = log.computeFilename(log.getFilenum()); - Map> commits = logRecoveryMangaer.getCommitsFromLog( + Map> commits = logRecoveryMangaer.getCommitsFromLog( filename, -1, null); assertNull(commits); @@ -130,7 +131,7 @@ public class TestTHLog extends HBaseTest log.close(); Path filename = log.computeFilename(log.getFilenum()); - Map> commits = logRecoveryMangaer.getCommitsFromLog( + Map> commits = logRecoveryMangaer.getCommitsFromLog( filename, -1, null); assertNull(commits); @@ -165,7 +166,7 @@ public class TestTHLog extends HBaseTest log.close(); Path filename = log.computeFilename(log.getFilenum()); - Map> commits = logMangaer.getCommitsFromLog(filename, + Map> commits = logMangaer.getCommitsFromLog(filename, -1, null); assertNull(commits); @@ -200,7 +201,7 @@ public class TestTHLog extends HBaseTest log.close(); Path filename = log.computeFilename(log.getFilenum()); - Map> commits = logMangaer.getCommitsFromLog(filename, + Map> commits = logMangaer.getCommitsFromLog(filename, -1, null); assertNull(commits); @@ -235,7 +236,7 @@ public class TestTHLog extends HBaseTest log.close(); Path filename = log.computeFilename(log.getFilenum()); - Map> commits = logMangaer.getCommitsFromLog(filename, + Map> commits = logMangaer.getCommitsFromLog(filename, -1, null); assertNull(commits); Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/KeyValue.java URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/KeyValue.java?rev=925047&r1=925046&r2=925047&view=diff ============================================================================== --- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/KeyValue.java (original) +++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/KeyValue.java Thu Mar 18 23:51:49 2010 @@ -1794,14 +1794,22 @@ public class KeyValue implements Writabl (2 * Bytes.SIZEOF_INT)); } - // Writable - public void readFields(final DataInput in) throws IOException { - this.length = in.readInt(); + // this overload assumes that the length bytes have already been read, + // and it expects the length of the KeyValue to be explicitly passed + // to it. + public void readFields(int length, final DataInput in) throws IOException { + this.length = length; this.offset = 0; this.bytes = new byte[this.length]; in.readFully(this.bytes, 0, this.length); } + // Writable + public void readFields(final DataInput in) throws IOException { + int length = in.readInt(); + readFields(length, in); + } + public void write(final DataOutput out) throws IOException { out.writeInt(this.length); out.write(this.bytes, this.offset, this.length); Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HLog.java URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HLog.java?rev=925047&r1=925046&r2=925047&view=diff ============================================================================== --- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HLog.java (original) +++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HLog.java Thu Mar 18 23:51:49 2010 @@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.util.Byte import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.SequenceFile.Metadata; @@ -439,7 +440,7 @@ public class HLog implements HConstants, } protected SequenceFile.Writer createWriter(Path path) throws IOException { - return createWriter(path, HLogKey.class, KeyValue.class); + return createWriter(path, HLogKey.class, WALEdit.class); } // usage: see TestLogRolling.java @@ -448,7 +449,7 @@ public class HLog implements HConstants, } protected SequenceFile.Writer createWriter(Path path, - Class keyClass, Class valueClass) + Class keyClass, Class valueClass) throws IOException { return SequenceFile.createWriter(this.fs, this.conf, path, keyClass, valueClass, fs.getConf().getInt("io.file.buffer.size", 4096), fs @@ -637,12 +638,13 @@ public class HLog implements HConstants, * @param now Time of this edit write. * @throws IOException */ - public void append(HRegionInfo regionInfo, KeyValue logEdit, - final long now) + public void append(HRegionInfo regionInfo, WALEdit logEdit, + final long now, + final boolean isMetaRegion) throws IOException { byte [] regionName = regionInfo.getRegionName(); byte [] tableName = regionInfo.getTableDesc().getName(); - this.append(regionInfo, makeKey(regionName, tableName, -1, now), logEdit); + this.append(regionInfo, makeKey(regionName, tableName, -1, now), logEdit, isMetaRegion); } /** @@ -664,7 +666,8 @@ public class HLog implements HConstants, * @param logKey * @throws IOException */ - public void append(HRegionInfo regionInfo, HLogKey logKey, KeyValue logEdit) + public void append(HRegionInfo regionInfo, HLogKey logKey, WALEdit logEdit, + final boolean isMetaRegion) throws IOException { if (this.closed) { throw new IOException("Cannot append; log is closed"); @@ -683,6 +686,10 @@ public class HLog implements HConstants, this.unflushedEntries.incrementAndGet(); this.numEntries.incrementAndGet(); } + + // sync txn to file system + this.sync(isMetaRegion); + if (this.editsSize.get() > this.logrollsize) { if (listener != null) { listener.logRollRequested(); @@ -714,29 +721,34 @@ public class HLog implements HConstants, * @param now * @throws IOException */ - public void append(byte [] regionName, byte [] tableName, List edits, - final long now) + public void append(byte [] regionName, byte [] tableName, WALEdit edits, + final long now, final boolean isMetaRegion) throws IOException { if (this.closed) { throw new IOException("Cannot append; log is closed"); } - long seqNum [] = obtainSeqNum(edits.size()); + long seqNum = obtainSeqNum(); + synchronized (this.updateLock) { // The 'lastSeqWritten' map holds the sequence number of the oldest // write for each region (i.e. the first edit added to the particular // memstore). . When the cache is flushed, the entry for the // region being flushed is removed if the sequence number of the flush // is greater than or equal to the value in lastSeqWritten. - this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum[0])); + this.lastSeqWritten.putIfAbsent(regionName, seqNum); int counter = 0; - for (KeyValue kv: edits) { - HLogKey logKey = makeKey(regionName, tableName, seqNum[counter++], now); - doWrite(logKey, kv); - this.numEntries.incrementAndGet(); - } + + HLogKey logKey = makeKey(regionName, tableName, seqNum, now); + doWrite(logKey, edits); + this.numEntries.incrementAndGet(); + // Only count 1 row as an unflushed entry this.unflushedEntries.incrementAndGet(); } + + // sync txn to file system + this.sync(isMetaRegion); + if (this.editsSize.get() > this.logrollsize) { requestLogRoll(); } @@ -920,7 +932,7 @@ public class HLog implements HConstants, } } - private void doWrite(HLogKey logKey, KeyValue logEdit) + private void doWrite(HLogKey logKey, WALEdit logEdit) throws IOException { if (!this.enabled) { return; @@ -1002,7 +1014,8 @@ public class HLog implements HConstants, * @throws IOException */ public void completeCacheFlush(final byte [] regionName, final byte [] tableName, - final long logSeqId) + final long logSeqId, + final boolean isMetaRegion) throws IOException { try { if (this.closed) { @@ -1010,8 +1023,11 @@ public class HLog implements HConstants, } synchronized (updateLock) { long now = System.currentTimeMillis(); + KeyValue edit = completeCacheFlushLogEdit(); + WALEdit edits = new WALEdit(); + edits.add(edit); this.writer.append(makeKey(regionName, tableName, logSeqId, System.currentTimeMillis()), - completeCacheFlushLogEdit()); + edits); writeTime += System.currentTimeMillis() - now; writeOps++; this.numEntries.incrementAndGet(); @@ -1020,6 +1036,9 @@ public class HLog implements HConstants, this.lastSeqWritten.remove(regionName); } } + // sync txn to file system + this.sync(isMetaRegion); + } finally { this.cacheFlushLock.unlock(); } @@ -1170,7 +1189,7 @@ public class HLog implements HConstants, in = new SequenceFile.Reader(fs, logfiles[i].getPath(), conf); try { HLogKey key = newKey(conf); - KeyValue val = new KeyValue(); + WALEdit val = new WALEdit(); while (in.next(key, val)) { byte [] regionName = key.getRegionName(); LinkedList queue = logEntries.get(regionName); @@ -1185,7 +1204,7 @@ public class HLog implements HConstants, // Make the key and value new each time; otherwise same instance // is used over and over. key = newKey(conf); - val = new KeyValue(); + val = new WALEdit(); } LOG.debug("Pushed=" + count + " entries from " + logfiles[i].getPath()); @@ -1254,7 +1273,7 @@ public class HLog implements HConstants, } SequenceFile.Writer w = SequenceFile.createWriter(fs, conf, logfile, - getKeyClass(conf), KeyValue.class, getCompressionType(conf)); + getKeyClass(conf), WALEdit.class, getCompressionType(conf)); wap = new WriterAndPath(logfile, w); logWriters.put(key, wap); if (LOG.isDebugEnabled()) { @@ -1265,13 +1284,15 @@ public class HLog implements HConstants, if (old != null) { // Copy from existing log file HLogKey oldkey = newKey(conf); - KeyValue oldval = new KeyValue(); + WALEdit oldval = new WALEdit(); for (; old.next(oldkey, oldval); count++) { if (LOG.isDebugEnabled() && count > 0 && count % 10000 == 0) { LOG.debug("Copied " + count + " edits"); } w.append(oldkey, oldval); + oldkey = newKey(conf); + oldval = new WALEdit(); } old.close(); fs.delete(oldlogfile, true); @@ -1339,14 +1360,14 @@ public class HLog implements HConstants, * Only used when splitting logs */ public static class HLogEntry { - private KeyValue edit; + private WALEdit edit; private HLogKey key; /** * Constructor for both params * @param edit log's edit * @param key log's key */ - public HLogEntry(KeyValue edit, HLogKey key) { + public HLogEntry(WALEdit edit, HLogKey key) { super(); this.edit = edit; this.key = key; @@ -1355,7 +1376,7 @@ public class HLog implements HConstants, * Gets the edit * @return edit */ - public KeyValue getEdit() { + public WALEdit getEdit() { return edit; } /** @@ -1387,6 +1408,13 @@ public class HLog implements HConstants, if (!append) { return; } + + // lease recovery not needed for local file system case. + // currently, local file system doesn't implement append either. + if (!(fs instanceof DistributedFileSystem)) { + return; + } + // Trying recovery boolean recovered = false; while (!recovered) { Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=925047&r1=925046&r2=925047&view=diff ============================================================================== --- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original) +++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Thu Mar 18 23:51:49 2010 @@ -57,6 +57,7 @@ package org.apache.hadoop.hbase.regionse import java.util.AbstractList; import java.util.ArrayList; import java.util.Collection; + import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -69,7 +70,7 @@ package org.apache.hadoop.hbase.regionse import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; - import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * HRegion stores data for a certain region of a table. It stores all columns @@ -1015,7 +1016,8 @@ public class HRegion implements HConstan // and that all updates to the log for this regionName that have lower // log-sequence-ids can be safely ignored. this.log.completeCacheFlush(getRegionName(), - regionInfo.getTableDesc().getName(), completeSequenceId); + regionInfo.getTableDesc().getName(), completeSequenceId, + this.getRegionInfo().isMetaRegion()); // C. Finally notify anyone waiting on memstore to clear: // e.g. checkResources(). @@ -1184,11 +1186,10 @@ public class HRegion implements HConstan checkFamily(family); } } - - for(Map.Entry> e: delete.getFamilyMap().entrySet()) { - byte [] family = e.getKey(); - delete(family, e.getValue(), writeToWAL); - } + + // All edits for the given row (across all column families) must happen atomically. + delete(delete.getFamilyMap(), writeToWAL); + } finally { if(lockid == null) releaseRowLock(lid); splitsAndClosesLock.readLock().unlock(); @@ -1198,12 +1199,11 @@ public class HRegion implements HConstan /** - * @param family - * @param kvs + * @param familyMap map of family to edits for the given family. * @param writeToWAL * @throws IOException */ - public void delete(byte [] family, List kvs, boolean writeToWAL) + public void delete(Map> familyMap, boolean writeToWAL) throws IOException { long now = System.currentTimeMillis(); byte [] byteNow = Bytes.toBytes(now); @@ -1212,38 +1212,70 @@ public class HRegion implements HConstan try { if (writeToWAL) { - this.log.append(regionInfo.getRegionName(), - regionInfo.getTableDesc().getName(), kvs, now); + // + // write/sync to WAL should happen before we touch memstore. + // + // If order is reversed, i.e. we write to memstore first, and + // for some reason fail to write/sync to commit log, the memstore + // will contain uncommitted transactions. + // + + // bunch up all edits across all column families into a + // single WALEdit. + WALEdit walEdit = new WALEdit(); + for (Map.Entry> e : familyMap.entrySet()) { + List kvs = e.getValue(); + for (KeyValue kv : kvs) { + walEdit.add(kv); + } + } + // append the edit to WAL. The append also does the sync. + if (!walEdit.isEmpty()) { + this.log.append(regionInfo.getRegionName(), + regionInfo.getTableDesc().getName(), + walEdit, now, + this.getRegionInfo().isMetaRegion()); + } } + long size = 0; - Store store = getStore(family); - for (KeyValue kv: kvs) { - // Check if time is LATEST, change to time of most recent addition if so - // This is expensive. - if (kv.isLatestTimestamp() && kv.isDeleteType()) { - List result = new ArrayList(1); - Get g = new Get(kv.getRow()); - NavigableSet qualifiers = - new TreeSet(Bytes.BYTES_COMPARATOR); - byte [] q = kv.getQualifier(); - if(q == null) q = HConstants.EMPTY_BYTE_ARRAY; - qualifiers.add(q); - get(store, g, qualifiers, result); - if (result.isEmpty()) { - // Nothing to delete - continue; - } - if (result.size() > 1) { - throw new RuntimeException("Unexpected size: " + result.size()); + + // + // Now make changes to the memstore. + // + for (Map.Entry> e : familyMap.entrySet()) { + + byte[] family = e.getKey(); + List kvs = e.getValue(); + + Store store = getStore(family); + for (KeyValue kv: kvs) { + // Check if time is LATEST, change to time of most recent addition if so + // This is expensive. + if (kv.isLatestTimestamp() && kv.isDeleteType()) { + List result = new ArrayList(1); + Get g = new Get(kv.getRow()); + NavigableSet qualifiers = + new TreeSet(Bytes.BYTES_COMPARATOR); + byte [] q = kv.getQualifier(); + if(q == null) q = HConstants.EMPTY_BYTE_ARRAY; + qualifiers.add(q); + get(store, g, qualifiers, result); + if (result.isEmpty()) { + // Nothing to delete + continue; + } + if (result.size() > 1) { + throw new RuntimeException("Unexpected size: " + result.size()); + } + KeyValue getkv = result.get(0); + Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(), + getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG); + } else { + kv.updateLatestStamp(byteNow); } - KeyValue getkv = result.get(0); - Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(), - getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG); - } else { - kv.updateLatestStamp(byteNow); + size = this.memstoreSize.addAndGet(store.delete(kv)); } - - size = this.memstoreSize.addAndGet(store.delete(kv)); } flush = isFlushSize(size); } finally { @@ -1309,15 +1341,8 @@ public class HRegion implements HConstan Integer lid = getLock(lockid, row); byte [] now = Bytes.toBytes(System.currentTimeMillis()); try { - for (Map.Entry> entry: - put.getFamilyMap().entrySet()) { - byte [] family = entry.getKey(); - checkFamily(family); - List puts = entry.getValue(); - if (updateKeys(puts, now)) { - put(family, puts, writeToWAL); - } - } + // All edits for the given row (across all column families) must happen atomically. + put(put.getFamilyMap(), writeToWAL); } finally { if(lockid == null) releaseRowLock(lid); } @@ -1377,16 +1402,9 @@ public class HRegion implements HConstan matches = Bytes.equals(expectedValue, actualValue); } //If matches put the new put - if(matches) { - for(Map.Entry> entry : - put.getFamilyMap().entrySet()) { - byte [] fam = entry.getKey(); - checkFamily(fam); - List puts = entry.getValue(); - if(updateKeys(puts, now)) { - put(fam, puts, writeToWAL); - } - } + if (matches) { + // All edits for the given row (across all column families) must happen atomically. + put(put.getFamilyMap(), writeToWAL); return true; } return false; @@ -1496,34 +1514,76 @@ public class HRegion implements HConstan */ private void put(final byte [] family, final List edits) throws IOException { - this.put(family, edits, true); + Map> familyMap = new HashMap>(); + familyMap.put(family, edits); + this.put(familyMap, true); } /** * Add updates first to the hlog (if writeToWal) and then add values to memstore. * Warning: Assumption is caller has lock on passed in row. - * @param family - * @param edits + * @param familyMap map of family to edits for the given family. * @param writeToWAL if true, then we should write to the log * @throws IOException */ - private void put(final byte [] family, final List edits, + private void put(final Map> familyMap, boolean writeToWAL) throws IOException { - if (edits == null || edits.isEmpty()) { - return; - } + long now = System.currentTimeMillis(); + byte[] byteNow = Bytes.toBytes(now); boolean flush = false; this.updatesLock.readLock().lock(); try { - if (writeToWAL) { - long now = System.currentTimeMillis(); + + WALEdit walEdit = new WALEdit(); + + // check if column families are valid; + // check if any timestampupdates are needed; + // and if writeToWAL is set, then also collapse edits into a single list. + for (Map.Entry> e: familyMap.entrySet()) { + List edits = e.getValue(); + byte[] family = e.getKey(); + + // is this a valid column family? + checkFamily(family); + + // update timestamp on keys if required. + if (updateKeys(edits, byteNow)) { + if (writeToWAL) { + // bunch up all edits across all column families into a + // single WALEdit. + for (KeyValue kv : edits) { + walEdit.add(kv); + } + } + } + } + + // append to and sync WAL + if (!walEdit.isEmpty()) { + // + // write/sync to WAL should happen before we touch memstore. + // + // If order is reversed, i.e. we write to memstore first, and + // for some reason fail to write/sync to commit log, the memstore + // will contain uncommitted transactions. + // this.log.append(regionInfo.getRegionName(), - regionInfo.getTableDesc().getName(), edits, now); + regionInfo.getTableDesc().getName(), + walEdit, now, + this.getRegionInfo().isMetaRegion()); } + long size = 0; - Store store = getStore(family); - for (KeyValue kv: edits) { - size = this.memstoreSize.addAndGet(store.add(kv)); + + // now make changes to the memstore + for (Map.Entry> e : familyMap.entrySet()) { + byte[] family = e.getKey(); + List edits = e.getValue(); + + Store store = getStore(family); + for (KeyValue kv: edits) { + size = this.memstoreSize.addAndGet(store.add(kv)); + } } flush = isFlushSize(size); } finally { @@ -2485,10 +2545,11 @@ public class HRegion implements HConstan // now log it: if (writeToWAL) { long now = System.currentTimeMillis(); - List edits = new ArrayList(1); - edits.add(newKv); + WALEdit walEdit = new WALEdit(); + walEdit.add(newKv); this.log.append(regionInfo.getRegionName(), - regionInfo.getTableDesc().getName(), edits, now); + regionInfo.getTableDesc().getName(), walEdit, now, + this.getRegionInfo().isMetaRegion()); } // Now request the ICV to the store, this will set the timestamp @@ -2543,7 +2604,6 @@ public class HRegion implements HConstan (5 * Bytes.SIZEOF_BOOLEAN)) + (3 * ClassSize.REENTRANT_LOCK)); - @Override public long heapSize() { long heapSize = DEEP_OVERHEAD; for(Store store : this.stores.values()) { Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=925047&r1=925046&r2=925047&view=diff ============================================================================== --- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original) +++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Thu Mar 18 23:51:49 2010 @@ -1763,7 +1763,6 @@ public class HRegionServer implements HC } region.put(put, getLockFromId(put.getLockId())); - this.hlog.sync(region.getRegionInfo().isMetaRegion()); } catch (Throwable t) { throw convertThrowableToIOE(cleanup(t)); } @@ -1796,9 +1795,7 @@ public class HRegionServer implements HC } catch (Throwable t) { throw convertThrowableToIOE(cleanup(t)); } - // All have been processed successfully. - this.hlog.sync(isMetaRegion); - + if (i == puts.length) { return -1; } else { @@ -1834,7 +1831,6 @@ public class HRegionServer implements HC boolean retval = region.checkAndPut(row, family, qualifier, value, put, getLockFromId(put.getLockId()), true); - this.hlog.sync(region.getRegionInfo().isMetaRegion()); return retval; } catch (Throwable t) { throw convertThrowableToIOE(cleanup(t)); @@ -1996,7 +1992,6 @@ public class HRegionServer implements HC } Integer lid = getLockFromId(delete.getLockId()); region.delete(delete, lid, writeToWAL); - this.hlog.sync(region.getRegionInfo().isMetaRegion()); } catch (Throwable t) { throw convertThrowableToIOE(cleanup(t)); } @@ -2030,7 +2025,6 @@ public class HRegionServer implements HC throw convertThrowableToIOE(cleanup(t)); } - this.hlog.sync(isMetaRegion); // All have been processed successfully. return -1; } @@ -2489,7 +2483,6 @@ public class HRegionServer implements HC long retval = region.incrementColumnValue(row, family, qualifier, amount, writeToWAL); - this.hlog.sync(region.getRegionInfo().isMetaRegion()); return retval; } catch (IOException e) { checkFileSystem(); Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/Store.java URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=925047&r1=925046&r2=925047&view=diff ============================================================================== --- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/Store.java (original) +++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/Store.java Thu Mar 18 23:51:49 2010 @@ -291,7 +291,7 @@ public class Store implements HConstants * * We can ignore any log message that has a sequence ID that's equal to or * lower than maxSeqID. (Because we know such log messages are already - * reflected in the MapFiles.) + * reflected in the HFiles.) * * @return the new max sequence id as per the log, or -1 if no log recovered */ @@ -320,13 +320,22 @@ public class Store implements HConstants reconstructionLog, this.conf); try { HLogKey key = HLog.newKey(conf); - KeyValue val = new KeyValue(); + WALEdit edits = new WALEdit(); long skippedEdits = 0; long editsCount = 0; // How many edits to apply before we send a progress report. int reportInterval = this.conf.getInt("hbase.hstore.report.interval.edits", 2000); - while (logReader.next(key, val)) { + + // TBD: Need to add an exception handler around logReader.next. + // + // A transaction now appears as a single edit. If logReader.next() + // returns an exception, then it must be a incomplete/partial + // transaction at the end of the file. Rather than bubble up + // the exception, we should catch it and simply ignore the + // partial transaction during this recovery phase. + // + while (logReader.next(key, edits)) { if (firstSeqIdInLog == -1) { firstSeqIdInLog = key.getLogSeqNum(); } @@ -335,15 +344,19 @@ public class Store implements HConstants skippedEdits++; continue; } - // Check this edit is for me. Also, guard against writing the special - // METACOLUMN info such as HBASE::CACHEFLUSH entries - if (val.matchingFamily(HLog.METAFAMILY) || - !Bytes.equals(key.getRegionName(), region.regionInfo.getRegionName()) || - !val.matchingFamily(family.getName())) { - continue; + for (KeyValue kv : edits.getKeyValues()) + { + // Check this edit is for me. Also, guard against writing the special + // METACOLUMN info such as HBASE::CACHEFLUSH entries + if (kv.matchingFamily(HLog.METAFAMILY) || + !Bytes.equals(key.getRegionName(), region.regionInfo.getRegionName()) || + !kv.matchingFamily(family.getName())) { + continue; + } + // Add anything as value as long as we use same instance each time. + reconstructedCache.add(kv); } - // Add anything as value as long as we use same instance each time. - reconstructedCache.add(val); + editsCount++; // Every 2k edits, tell the reporter we're making progress. // Have seen 60k edits taking 3minutes to complete. @@ -351,7 +364,7 @@ public class Store implements HConstants reporter.progress(); } // Instantiate a new KeyValue to perform Writable on - val = new KeyValue(); + edits = new WALEdit(); } if (LOG.isDebugEnabled()) { LOG.debug("Applied " + editsCount + ", skipped " + skippedEdits + @@ -362,7 +375,7 @@ public class Store implements HConstants } finally { logReader.close(); } - + if (reconstructedCache.size() > 0) { // We create a "virtual flush" at maxSeqIdInLog+1. if (LOG.isDebugEnabled()) { Modified: hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java?rev=925047&r1=925046&r2=925047&view=diff ============================================================================== --- hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java (original) +++ hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java Thu Mar 18 23:51:49 2010 @@ -102,12 +102,12 @@ public class TestHLog extends HBaseTestC for (int ii = 0; ii < howmany; ii++) { for (int i = 0; i < howmany; i++) { for (int j = 0; j < howmany; j++) { - List edit = new ArrayList(); + WALEdit edits = new WALEdit(); byte [] column = Bytes.toBytes("column:" + Integer.toString(j)); - edit.add(new KeyValue(rowName, column, System.currentTimeMillis(), + edits.add(new KeyValue(rowName, column, System.currentTimeMillis(), column)); - System.out.println("Region " + i + ": " + edit); - log.append(Bytes.toBytes("" + i), tableName, edit, System.currentTimeMillis()); + System.out.println("Region " + i + ": " + edits); + log.append(Bytes.toBytes("" + i), tableName, edits, System.currentTimeMillis(), false); } } log.rollWriter(); @@ -131,11 +131,11 @@ public class TestHLog extends HBaseTestC new SequenceFile.Reader(this.fs, splits.get(i), this.conf); try { HLogKey key = new HLogKey(); - KeyValue kv = new KeyValue(); + WALEdit kvs = new WALEdit(); int count = 0; String previousRegion = null; long seqno = -1; - while(r.next(key, kv)) { + while(r.next(key, kvs)) { String region = Bytes.toString(key.getRegionName()); // Assert that all edits are for same region. if (previousRegion != null) { @@ -144,8 +144,10 @@ public class TestHLog extends HBaseTestC assertTrue(seqno < key.getLogSeqNum()); seqno = key.getLogSeqNum(); previousRegion = region; - System.out.println(key + " " + kv); + System.out.println(key + " " + kvs); count++; + key = new HLogKey(); + kvs = new WALEdit(); } assertEquals(howmany * howmany, count); } finally { @@ -168,31 +170,39 @@ public class TestHLog extends HBaseTestC // Write columns named 1, 2, 3, etc. and then values of single byte // 1, 2, 3... long timestamp = System.currentTimeMillis(); - List cols = new ArrayList(); + WALEdit cols = new WALEdit(); for (int i = 0; i < COL_COUNT; i++) { cols.add(new KeyValue(row, Bytes.toBytes("column:" + Integer.toString(i)), timestamp, new byte[] { (byte)(i + '0') })); } - log.append(regionName, tableName, cols, System.currentTimeMillis()); + log.append(regionName, tableName, cols, System.currentTimeMillis(), false); long logSeqId = log.startCacheFlush(); - log.completeCacheFlush(regionName, tableName, logSeqId); + log.completeCacheFlush(regionName, tableName, logSeqId, false); log.close(); Path filename = log.computeFilename(log.getFilenum()); log = null; // Now open a reader on the log and assert append worked. reader = new SequenceFile.Reader(fs, filename, conf); HLogKey key = new HLogKey(); - KeyValue val = new KeyValue(); - for (int i = 0; i < COL_COUNT; i++) { - reader.next(key, val); + WALEdit vals = new WALEdit(); + reader.next(key, vals); + assertEquals(COL_COUNT, vals.size()); + int idx = 0; + for (KeyValue val : vals.getKeyValues()) { assertTrue(Bytes.equals(regionName, key.getRegionName())); assertTrue(Bytes.equals(tableName, key.getTablename())); assertTrue(Bytes.equals(row, val.getRow())); - assertEquals((byte)(i + '0'), val.getValue()[0]); + assertEquals((byte)(idx + '0'), val.getValue()[0]); System.out.println(key + " " + val); + idx++; } - while (reader.next(key, val)) { - // Assert only one more row... the meta flushed row. + + // Get next row... the meta flushed row. + key = new HLogKey(); + vals = new WALEdit(); + reader.next(key, vals); + assertEquals(1, vals.size()); + for (KeyValue val : vals.getKeyValues()) { assertTrue(Bytes.equals(regionName, key.getRegionName())); assertTrue(Bytes.equals(tableName, key.getTablename())); assertTrue(Bytes.equals(HLog.METAROW, val.getRow())); Modified: hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=925047&r1=925046&r2=925047&view=diff ============================================================================== --- hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original) +++ hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java Thu Mar 18 23:51:49 2010 @@ -21,7 +21,9 @@ package org.apache.hadoop.hbase.regionse import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.TreeMap; import java.util.Arrays; @@ -385,7 +387,9 @@ public class TestHRegion extends HBaseTe //testing existing family byte [] family = fam2; try { - region.delete(family, kvs, true); + Map> deleteMap = new HashMap>(); + deleteMap.put(family, kvs); + region.delete(deleteMap, true); } catch (Exception e) { assertTrue("Family " +new String(family)+ " does not exist", false); } @@ -394,7 +398,9 @@ public class TestHRegion extends HBaseTe boolean ok = false; family = fam4; try { - region.delete(family, kvs, true); + Map> deleteMap = new HashMap>(); + deleteMap.put(family, kvs); + region.delete(deleteMap, true); } catch (Exception e) { ok = true; } @@ -595,7 +601,9 @@ public class TestHRegion extends HBaseTe kvs.add(new KeyValue(row1, fam1, col2, null)); kvs.add(new KeyValue(row1, fam1, col3, null)); - region.delete(fam1, kvs, true); + Map> deleteMap = new HashMap>(); + deleteMap.put(fam1, kvs); + region.delete(deleteMap, true); // extract the key values out the memstore: // This is kinda hacky, but better than nothing...