Return-Path: Delivered-To: apmail-hadoop-hbase-commits-archive@minotaur.apache.org Received: (qmail 68520 invoked from network); 16 May 2009 06:11:16 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 16 May 2009 06:11:16 -0000 Received: (qmail 92519 invoked by uid 500); 16 May 2009 06:11:16 -0000 Delivered-To: apmail-hadoop-hbase-commits-archive@hadoop.apache.org Received: (qmail 92469 invoked by uid 500); 16 May 2009 06:11:15 -0000 Mailing-List: contact hbase-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hbase-dev@hadoop.apache.org Delivered-To: mailing list hbase-commits@hadoop.apache.org Received: (qmail 92460 invoked by uid 99); 16 May 2009 06:11:15 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 16 May 2009 06:11:15 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 16 May 2009 06:11:06 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id D17752388895; Sat, 16 May 2009 06:10:45 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r775418 - in /hadoop/hbase/trunk: ./ conf/ src/java/org/apache/hadoop/hbase/master/ src/java/org/apache/hadoop/hbase/regionserver/ src/java/org/apache/hadoop/hbase/regionserver/tableindexed/ src/java/org/apache/hadoop/hbase/regionserver/tra... Date: Sat, 16 May 2009 06:10:45 -0000 To: hbase-commits@hadoop.apache.org From: stack@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090516061045.D17752388895@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: stack Date: Sat May 16 06:10:44 2009 New Revision: 775418 URL: http://svn.apache.org/viewvc?rev=775418&view=rev Log: HBASE-1394 Uploads sometimes fall to 0 requests/second (Binding up on HLog#append?) Modified: hadoop/hbase/trunk/CHANGES.txt hadoop/hbase/trunk/conf/hbase-default.xml hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/HMaster.java hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogKey.java hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegionServer.java hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalHLogManager.java hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java Modified: hadoop/hbase/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=775418&r1=775417&r2=775418&view=diff ============================================================================== --- hadoop/hbase/trunk/CHANGES.txt (original) +++ hadoop/hbase/trunk/CHANGES.txt Sat May 16 06:10:44 2009 @@ -249,6 +249,8 @@ HBASE-1424 have shell print regioninfo and location on first load if DEBUG enabled HBASE-1008 [performance] The replay of logs on server crash takes way too long + HBASE-1394 Uploads sometimes fall to 0 requests/second (Binding up on + HLog#append?) OPTIMIZATIONS HBASE-1412 Change values for delete column and column family in KeyValue Modified: hadoop/hbase/trunk/conf/hbase-default.xml URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/conf/hbase-default.xml?rev=775418&r1=775417&r2=775418&view=diff ============================================================================== --- hadoop/hbase/trunk/conf/hbase-default.xml (original) +++ hadoop/hbase/trunk/conf/hbase-default.xml Sat May 16 06:10:44 2009 @@ -175,14 +175,6 @@ - hbase.regionserver.maxlogentries - 100000 - Rotate the HRegion HLogs when count of entries exceeds this - value. Default: 100,000. Value is checked by a thread that runs every - hbase.server.thread.wakefrequency. - - - hbase.regionserver.flushlogentries 100 Sync the HLog to the HDFS when it has accumulated this many Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/HMaster.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/HMaster.java?rev=775418&r1=775417&r2=775418&view=diff ============================================================================== --- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/HMaster.java (original) +++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/HMaster.java Sat May 16 06:10:44 2009 @@ -267,7 +267,6 @@ this.rootdir, this.conf); HRegion meta = HRegion.createHRegion(HRegionInfo.FIRST_META_REGIONINFO, this.rootdir, this.conf); - // Add first region from the META table to the ROOT region. HRegion.addRegionToMETA(root, meta); root.close(); Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java?rev=775418&r1=775417&r2=775418&view=diff ============================================================================== --- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java (original) +++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java Sat May 16 06:10:44 2009 @@ -104,7 +104,6 @@ private final Path dir; private final Configuration conf; private final LogRollListener listener; - private final int maxlogentries; private final long optionalFlushInterval; private final long blocksize; private final int flushlogentries; @@ -132,11 +131,17 @@ private final AtomicLong logSeqNum = new AtomicLong(0); - private volatile long filenum = 0; + private volatile long filenum = -1; private volatile long old_filenum = -1; private final AtomicInteger numEntries = new AtomicInteger(0); + // Size of edits written so far. Used figuring when to rotate logs. + private final AtomicLong editsSize = new AtomicLong(0); + + // If > than this size, roll the log. + private final long logrollsize; + // This lock prevents starting a log roll during a cache flush. // synchronized is insufficient because a cache flush spans two method calls. private final Lock cacheFlushLock = new ReentrantLock(); @@ -144,7 +149,9 @@ // We synchronize on updateLock to prevent updates and to prevent a log roll // during an update private final Object updateLock = new Object(); - + + private final boolean enabled; + /* * If more than this many logs, force flush of oldest region to oldest edit * goes to disk. If too many and we crash, then will take forever replaying. @@ -182,12 +189,13 @@ this.dir = dir; this.conf = conf; this.listener = listener; - this.maxlogentries = - conf.getInt("hbase.regionserver.maxlogentries", 100000); this.flushlogentries = conf.getInt("hbase.regionserver.flushlogentries", 100); this.blocksize = conf.getLong("hbase.regionserver.hlog.blocksize", this.fs.getDefaultBlockSize()); + // Roll at 95% of block size. + float multi = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f); + this.logrollsize = (long)(this.blocksize * multi); this.optionalFlushInterval = conf.getLong("hbase.regionserver.optionallogflushinterval", 10 * 1000); this.lastLogFlushTime = System.currentTimeMillis(); @@ -196,15 +204,16 @@ } fs.mkdirs(dir); this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 64); + this.enabled = conf.getBoolean("hbase.regionserver.hlog.enabled", true); LOG.info("HLog configuration: blocksize=" + this.blocksize + - ", maxlogentries=" + this.maxlogentries + ", flushlogentries=" + - this.flushlogentries + ", optionallogflushinternal=" + - this.optionalFlushInterval + "ms"); + ", rollsize=" + this.logrollsize + + ", enabled=" + this.enabled + + ", flushlogentries=" + this.flushlogentries + + ", optionallogflushinternal=" + this.optionalFlushInterval + "ms"); rollWriter(); } /** - * Accessor for tests. Not a part of the public API. * @return Current state of the monotonically increasing file id. */ public long getFilenum() { @@ -212,17 +221,13 @@ } /** - * Get the compression type for the hlog files. - * Commit logs SHOULD NOT be compressed. You'll lose edits if the compression - * record is not complete. In gzip, record is 32k so you could lose up to - * 32k of edits (All of this is moot till we have sync/flush in hdfs but - * still...). + * Get the compression type for the hlog files * @param c Configuration to use. * @return the kind of compression to use */ private static CompressionType getCompressionType(final Configuration c) { - String name = c.get("hbase.io.seqfile.compression.type"); - return name == null? CompressionType.NONE: CompressionType.valueOf(name); + // Compression makes no sense for commit log. Always return NONE. + return CompressionType.NONE; } /** @@ -277,23 +282,24 @@ } synchronized (updateLock) { // Clean up current writer. - Path oldFile = cleanupCurrentWriter(); - // Create a new one. - this.old_filenum = this.filenum; + Path oldFile = cleanupCurrentWriter(this.filenum); + if (this.filenum >= 0) { + this.old_filenum = this.filenum; + } this.filenum = System.currentTimeMillis(); Path newPath = computeFilename(this.filenum); - this.writer = SequenceFile.createWriter(this.fs, this.conf, newPath, HLogKey.class, KeyValue.class, fs.getConf().getInt("io.file.buffer.size", 4096), fs.getDefaultReplication(), this.blocksize, SequenceFile.CompressionType.NONE, new DefaultCodec(), null, new Metadata()); - LOG.info((oldFile != null? - "Closed " + oldFile + ", entries=" + this.numEntries.get() + ". ": "") + - "New log writer: " + FSUtils.getPath(newPath)); - + "Roll " + FSUtils.getPath(oldFile) + ", entries=" + + this.numEntries.get() + + ", calcsize=" + this.editsSize.get() + ", filesize=" + + this.fs.getFileStatus(oldFile).getLen() + ". ": "") + + "New hlog " + FSUtils.getPath(newPath)); // Can we delete any of the old log files? if (this.outputfiles.size() > 0) { if (this.lastSeqWritten.size() <= 0) { @@ -310,6 +316,7 @@ } } this.numEntries.set(0); + this.editsSize.set(0); updateLock.notifyAll(); } } finally { @@ -337,7 +344,7 @@ if (LOG.isDebugEnabled()) { // Find region associated with oldest key -- helps debugging. oldestRegion = getOldestRegion(oldestOutstandingSeqNum); - LOG.debug("Found " + sequenceNumbers.size() + " logs to remove " + + LOG.debug("Found " + sequenceNumbers.size() + " hlogs to remove " + " out of total " + this.outputfiles.size() + "; " + "oldest outstanding seqnum is " + oldestOutstandingSeqNum + " from region " + Bytes.toString(oldestRegion)); @@ -351,7 +358,7 @@ if (countOfLogs > this.maxLogs) { regionToFlush = oldestRegion != null? oldestRegion: getOldestRegion(oldestOutstandingSeqNum); - LOG.info("Too many logs: logs=" + countOfLogs + ", maxlogs=" + + LOG.info("Too many hlogs: logs=" + countOfLogs + ", maxlogs=" + this.maxLogs + "; forcing flush of region with oldest edits: " + Bytes.toString(regionToFlush)); } @@ -382,7 +389,8 @@ * @return Path to current writer or null if none. * @throws IOException */ - private Path cleanupCurrentWriter() throws IOException { + private Path cleanupCurrentWriter(final long currentfilenum) + throws IOException { Path oldFile = null; if (this.writer != null) { // Close the current writer, get a new one. @@ -393,12 +401,12 @@ // shut ourselves down to minimize loss. Alternative is to try and // keep going. See HBASE-930. FailedLogCloseException flce = - new FailedLogCloseException("#" + this.filenum); + new FailedLogCloseException("#" + currentfilenum); flce.initCause(e); throw e; } - oldFile = computeFilename(old_filenum); - if (filenum > 0) { + if (currentfilenum >= 0) { + oldFile = computeFilename(currentfilenum); this.outputfiles.put(Long.valueOf(this.logSeqNum.get() - 1), oldFile); } } @@ -406,7 +414,7 @@ } private void deleteLogFile(final Path p, final Long seqno) throws IOException { - LOG.info("removing old log file " + FSUtils.getPath(p) + + LOG.info("removing old hlog file " + FSUtils.getPath(p) + " whose highest sequence/edit id is " + seqno); this.fs.delete(p, true); } @@ -418,6 +426,7 @@ * @return Path */ public Path computeFilename(final long fn) { + if (fn < 0) return null; return new Path(dir, HLOG_DATFILE + fn); } @@ -442,7 +451,7 @@ synchronized (updateLock) { this.closed = true; if (LOG.isDebugEnabled()) { - LOG.debug("closing log writer in " + this.dir.toString()); + LOG.debug("closing hlog writer in " + this.dir.toString()); } this.writer.close(); updateLock.notifyAll(); @@ -457,11 +466,12 @@ * * @param regionInfo * @param logEdit + * @param now * @throws IOException */ - public void append(HRegionInfo regionInfo, KeyValue logEdit) + public void append(HRegionInfo regionInfo, KeyValue logEdit, final long now) throws IOException { - this.append(regionInfo, new byte[0], logEdit); + this.append(regionInfo, new byte[0], logEdit, now); } /** Append an entry to the log. @@ -469,9 +479,11 @@ * @param regionInfo * @param row * @param logEdit + * @param now Time of this edit write. * @throws IOException */ - public void append(HRegionInfo regionInfo, byte [] row, KeyValue logEdit) + public void append(HRegionInfo regionInfo, byte [] row, KeyValue logEdit, + final long now) throws IOException { if (this.closed) { throw new IOException("Cannot append; log is closed"); @@ -485,14 +497,13 @@ // region being flushed is removed if the sequence number of the flush // is greater than or equal to the value in lastSeqWritten. this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum)); - HLogKey logKey = new HLogKey(regionName, tableName, seqNum); + HLogKey logKey = new HLogKey(regionName, tableName, seqNum, now); boolean sync = regionInfo.isMetaRegion() || regionInfo.isRootRegion(); - doWrite(logKey, logEdit, sync); + doWrite(logKey, logEdit, sync, now); this.numEntries.incrementAndGet(); updateLock.notifyAll(); } - - if (this.numEntries.get() > this.maxlogentries) { + if (this.editsSize.get() > this.logrollsize) { if (listener != null) { listener.logRollRequested(); } @@ -520,10 +531,11 @@ * @param tableName * @param edits * @param sync + * @param now * @throws IOException */ void append(byte [] regionName, byte [] tableName, List edits, - boolean sync) + boolean sync, final long now) throws IOException { if (this.closed) { throw new IOException("Cannot append; log is closed"); @@ -537,13 +549,14 @@ this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum[0])); int counter = 0; for (KeyValue kv: edits) { - HLogKey logKey = new HLogKey(regionName, tableName, seqNum[counter++]); - doWrite(logKey, kv, sync); + HLogKey logKey = + new HLogKey(regionName, tableName, seqNum[counter++], now); + doWrite(logKey, kv, sync, now); this.numEntries.incrementAndGet(); } updateLock.notifyAll(); } - if (this.numEntries.get() > this.maxlogentries) { + if (this.editsSize.get() > this.logrollsize) { requestLogRoll(); } } @@ -558,19 +571,19 @@ if (!this.closed) { long now = System.currentTimeMillis(); synchronized (updateLock) { - if (((now - this.optionalFlushInterval) > - this.lastLogFlushTime) && this.unflushedEntries.get() > 0) { + if (((now - this.optionalFlushInterval) > this.lastLogFlushTime) && + this.unflushedEntries.get() > 0) { try { sync(); } catch (IOException e) { - LOG.error("Error flushing HLog", e); + LOG.error("Error flushing hlog", e); } } } long took = System.currentTimeMillis() - now; if (took > 1000) { LOG.warn(Thread.currentThread().getName() + " took " + took + - "ms optional sync'ing HLog; editcount=" + this.numEntries.get()); + "ms optional sync'ing hlog; editcount=" + this.numEntries.get()); } } } @@ -581,10 +594,14 @@ } } - private void doWrite(HLogKey logKey, KeyValue logEdit, boolean sync) + private void doWrite(HLogKey logKey, KeyValue logEdit, boolean sync, + final long now) throws IOException { + if (!this.enabled) { + return; + } try { - long now = System.currentTimeMillis(); + this.editsSize.addAndGet(logKey.heapSize() + logEdit.heapSize()); this.writer.append(logKey, logEdit); if (sync || this.unflushedEntries.incrementAndGet() >= flushlogentries) { sync(); @@ -592,10 +609,10 @@ long took = System.currentTimeMillis() - now; if (took > 1000) { LOG.warn(Thread.currentThread().getName() + " took " + took + - "ms appending an edit to HLog; editcount=" + this.numEntries.get()); + "ms appending an edit to hlog; editcount=" + this.numEntries.get()); } } catch (IOException e) { - LOG.fatal("Could not append. Requesting close of log", e); + LOG.fatal("Could not append. Requesting close of hlog", e); requestLogRoll(); throw e; } @@ -667,8 +684,8 @@ return; } synchronized (updateLock) { - this.writer.append(new HLogKey(regionName, tableName, logSeqId), - completeCacheFlushLogEdit()); + this.writer.append(new HLogKey(regionName, tableName, logSeqId, + System.currentTimeMillis()), completeCacheFlushLogEdit()); this.numEntries.incrementAndGet(); Long seq = this.lastSeqWritten.get(regionName); if (seq != null && logSeqId >= seq.longValue()) { @@ -729,7 +746,7 @@ // Nothing to do return; } - LOG.info("Splitting " + logfiles.length + " log(s) in " + + LOG.info("Splitting " + logfiles.length + " hlog(s) in " + srcDir.toString()); splitLog(rootDir, logfiles, fs, conf); try { @@ -741,7 +758,7 @@ throw io; } long endMillis = System.currentTimeMillis(); - LOG.info("log file splitting completed in " + (endMillis - millis) + + LOG.info("hlog file splitting completed in " + (endMillis - millis) + " millis for " + srcDir.toString()); } @@ -762,8 +779,8 @@ try { for (int i = 0; i < logfiles.length; i++) { if (LOG.isDebugEnabled()) { - LOG.debug("Splitting " + (i + 1) + " of " + logfiles.length + ": " + - logfiles[i].getPath() + ", length=" + logfiles[i].getLen()); + LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length + + ": " + logfiles[i].getPath() + ", length=" + logfiles[i].getLen()); } // Check for possibly empty file. With appends, currently Hadoop reports // a zero length even if the file has been sync'd. Revisit if @@ -777,7 +794,7 @@ try { int count = 0; while (in.next(key, val)) { - byte[] regionName = key.getRegionName(); + byte [] regionName = key.getRegionName(); LinkedList queue = logEntries.get(regionName); if (queue == null) { queue = new LinkedList(); @@ -787,7 +804,8 @@ queue.push(new HLogEntry(val, key)); count++; } - LOG.debug("Pushed " + count + " entries"); + LOG.debug("Pushed " + count + " entries from " + + logfiles[i].getPath()); } catch (IOException e) { e = RemoteExceptionHandler.checkIOException(e); if (!(e instanceof EOFException)) { @@ -797,7 +815,7 @@ } } catch (IOException e) { if (length <= 0) { - LOG.warn("Empty log, continuing: " + logfiles[i]); + LOG.warn("Empty hlog, continuing: " + logfiles[i]); continue; } throw e; @@ -838,7 +856,7 @@ Path oldlogfile = null; SequenceFile.Reader old = null; if (fs.exists(logfile)) { - LOG.warn("Old log file " + logfile + LOG.warn("Old hlog file " + logfile + " already exists. Copying existing file to new file"); oldlogfile = new Path(logfile.toString() + ".old"); fs.rename(logfile, oldlogfile); @@ -852,7 +870,7 @@ // iterate. logWriters.put(key, w); if (LOG.isDebugEnabled()) { - LOG.debug("Creating new log file writer for path " + LOG.debug("Creating new hlog file writer for path " + logfile + " and region " + Bytes.toString(key)); } @@ -893,10 +911,10 @@ // Wait for all threads to terminate try { for(int i = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS) ; i++) { - LOG.debug("Waiting for log writers to terminate, iteration #" + i); + LOG.debug("Waiting for hlog writers to terminate, iteration #" + i); } }catch(InterruptedException ex) { - LOG.warn("Log writers were interrupted, possible data loss!"); + LOG.warn("Hlog writers were interrupted, possible data loss!"); } } finally { for (SequenceFile.Writer w : logWriters.values()) { Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogKey.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogKey.java?rev=775418&r1=775417&r2=775418&view=diff ============================================================================== --- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogKey.java (original) +++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogKey.java Sat May 16 06:10:44 2009 @@ -19,6 +19,8 @@ */ package org.apache.hadoop.hbase.regionserver; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.*; @@ -34,14 +36,18 @@ *

Some Transactional edits (START, COMMIT, ABORT) will not have an * associated row. */ -public class HLogKey implements WritableComparable { +public class HLogKey implements WritableComparable, HeapSize { private byte [] regionName; private byte [] tablename; private long logSeqNum; + // Time at which this edit was written. + private long writeTime; + private int HEAP_TAX = HeapSize.OBJECT + (2 * HeapSize.BYTE_ARRAY) + + (2 * HeapSize.LONG); - /** Create an empty key useful when deserializing */ + /** Writable Consructor -- Do not use. */ public HLogKey() { - this(null, null, 0L); + this(null, null, 0L, HConstants.LATEST_TIMESTAMP); } /** @@ -52,12 +58,14 @@ * @param regionName - name of region * @param tablename - name of table * @param logSeqNum - log sequence number + * @param now Time at which this edit was written. */ public HLogKey(final byte [] regionName, final byte [] tablename, - long logSeqNum) { + long logSeqNum, final long now) { this.regionName = regionName; this.tablename = tablename; this.logSeqNum = logSeqNum; + this.writeTime = now; } ////////////////////////////////////////////////////////////////////////////// @@ -78,7 +86,11 @@ public long getLogSeqNum() { return logSeqNum; } - + + public long getWriteTime() { + return this.writeTime; + } + @Override public String toString() { return Bytes.toString(tablename) + "/" + Bytes.toString(regionName) + "/" + @@ -100,38 +112,44 @@ public int hashCode() { int result = this.regionName.hashCode(); result ^= this.logSeqNum; + result ^= this.writeTime; return result; } - // - // Comparable - // - public int compareTo(HLogKey o) { int result = Bytes.compareTo(this.regionName, o.regionName); - if(result == 0) { + if (result == 0) { if (this.logSeqNum < o.logSeqNum) { result = -1; } else if (this.logSeqNum > o.logSeqNum) { result = 1; } + if (result == 0) { + if (this.writeTime < o.writeTime) { + result = -1; + } else if (this.writeTime > o.writeTime) { + return 1; + } + } } return result; } - // - // Writable - // - public void write(DataOutput out) throws IOException { Bytes.writeByteArray(out, this.regionName); Bytes.writeByteArray(out, this.tablename); out.writeLong(logSeqNum); + out.writeLong(this.writeTime); } public void readFields(DataInput in) throws IOException { this.regionName = Bytes.readByteArray(in); this.tablename = Bytes.readByteArray(in); this.logSeqNum = in.readLong(); + this.writeTime = in.readLong(); + } + + public long heapSize() { + return this.regionName.length + this.tablename.length + HEAP_TAX; } } \ No newline at end of file Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=775418&r1=775417&r2=775418&view=diff ============================================================================== --- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original) +++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Sat May 16 06:10:44 2009 @@ -1341,7 +1341,7 @@ edits.add(kv); } if (!edits.isEmpty()) { - update(edits, writeToWAL); + update(edits, writeToWAL, now); } if (latestTimestampDeletes != null && !latestTimestampDeletes.isEmpty()) { @@ -1349,7 +1349,7 @@ // as edits. Need to do individually after figuring which is latest // timestamp to delete. for (byte [] column: latestTimestampDeletes) { - deleteMultiple(row, column, LATEST_TIMESTAMP, 1); + deleteMultiple(row, column, LATEST_TIMESTAMP, 1, now); } } } finally { @@ -1387,6 +1387,7 @@ splitsAndClosesLock.readLock().lock(); try { byte[] row = b.getRow(); + long now = System.currentTimeMillis(); Integer lid = getLock(lockid,row); try { NavigableSet keySet = @@ -1404,7 +1405,7 @@ } if (success) { long commitTime = (b.getTimestamp() == LATEST_TIMESTAMP)? - System.currentTimeMillis(): b.getTimestamp(); + now: b.getTimestamp(); Set latestTimestampDeletes = null; List edits = new ArrayList(); for (BatchOperation op: b) { @@ -1431,7 +1432,7 @@ edits.add(kv); } if (!edits.isEmpty()) { - update(edits, writeToWAL); + update(edits, writeToWAL, now); } if (latestTimestampDeletes != null && !latestTimestampDeletes.isEmpty()) { @@ -1439,7 +1440,7 @@ // as edits. Need to do individually after figuring which is latest // timestamp to delete. for (byte [] column: latestTimestampDeletes) { - deleteMultiple(row, column, LATEST_TIMESTAMP, 1); + deleteMultiple(row, column, LATEST_TIMESTAMP, 1, now); } } } @@ -1530,7 +1531,7 @@ try { // Delete ALL versions rather than column family VERSIONS. If we just did // VERSIONS, then if 2* VERSION cells, subsequent gets would get old stuff. - deleteMultiple(row, column, ts, ALL_VERSIONS); + deleteMultiple(row, column, ts, ALL_VERSIONS, System.currentTimeMillis()); } finally { if(lockid == null) releaseRowLock(lid); } @@ -1547,9 +1548,10 @@ throws IOException { checkReadOnly(); Integer lid = getLock(lockid, row); + long now = System.currentTimeMillis(); long time = ts; if (ts == HConstants.LATEST_TIMESTAMP) { - time = System.currentTimeMillis(); + time = now; } KeyValue kv = KeyValue.createFirstOnRow(row, time); try { @@ -1561,7 +1563,7 @@ // This is UGLY. COPY OF KEY PART OF KeyValue. edits.add(key.cloneDelete()); } - update(edits); + update(edits, now); } } finally { if (lockid == null) releaseRowLock(lid); @@ -1594,7 +1596,7 @@ for (KeyValue key: keyvalues) { edits.add(key.cloneDelete()); } - update(edits); + update(edits, now); } } finally { if(lockid == null) releaseRowLock(lid); @@ -1629,7 +1631,7 @@ for (KeyValue kv: keyvalues) { edits.add(kv.cloneDelete()); } - update(edits); + update(edits, now); } finally { if(lockid == null) releaseRowLock(lid); } @@ -1668,7 +1670,7 @@ for (KeyValue k: keyvalues) { edits.add(k.cloneDelete()); } - update(edits); + update(edits, now); } } finally { if(lockid == null) releaseRowLock(lid); @@ -1684,10 +1686,11 @@ * @param ts Timestamp to start search on. * @param versions How many versions to delete. Pass * {@link HConstants#ALL_VERSIONS} to delete all. + * @param now * @throws IOException */ private void deleteMultiple(final byte [] row, final byte [] column, - final long ts, final int versions) + final long ts, final int versions, final long now) throws IOException { checkReadOnly(); // We used to have a getKeys method that purportedly only got the keys and @@ -1704,7 +1707,7 @@ for (KeyValue key: keys) { edits.add(key.cloneDelete()); } - update(edits); + update(edits, now); } } @@ -1748,10 +1751,12 @@ * Add updates first to the hlog and then add values to memcache. * Warning: Assumption is caller has lock on passed in row. * @param edits Cell updates by column + * @praram now * @throws IOException */ - private void update(final List edits) throws IOException { - this.update(edits, true); + private void update(final List edits, final long now) + throws IOException { + this.update(edits, true, now); } /** @@ -1759,9 +1764,11 @@ * Warning: Assumption is caller has lock on passed in row. * @param writeToWAL if true, then we should write to the log * @param updatesByColumn Cell updates by column + * @param now * @throws IOException */ - private void update(final List edits, boolean writeToWAL) + private void update(final List edits, boolean writeToWAL, + final long now) throws IOException { if (edits == null || edits.isEmpty()) { return; @@ -1772,7 +1779,7 @@ if (writeToWAL) { this.log.append(regionInfo.getRegionName(), regionInfo.getTableDesc().getName(), edits, - (regionInfo.isMetaRegion() || regionInfo.isRootRegion())); + (regionInfo.isMetaRegion() || regionInfo.isRootRegion()), now); } long size = 0; for (KeyValue kv: edits) { @@ -2273,7 +2280,7 @@ List edits = new ArrayList(); edits.add(new KeyValue(row, COL_REGIONINFO, System.currentTimeMillis(), Writables.getBytes(r.getRegionInfo()))); - meta.update(edits); + meta.update(edits, System.currentTimeMillis()); } finally { meta.releaseRowLock(lid); } Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=775418&r1=775417&r2=775418&view=diff ============================================================================== --- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original) +++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Sat May 16 06:10:44 2009 @@ -205,9 +205,9 @@ // HLog and HLog roller. log is protected rather than private to avoid // eclipse warning when accessed by inner classes - protected volatile HLog log; - LogRoller logRoller; - LogFlusher logFlusher; + protected volatile HLog hlog; + LogRoller hlogRoller; + LogFlusher hlogFlusher; // limit compactions while starting up CompactionLimitThread compactionLimitThread; @@ -344,10 +344,10 @@ this.compactSplitThread = new CompactSplitThread(this); // Log rolling thread - this.logRoller = new LogRoller(this); + this.hlogRoller = new LogRoller(this); // Log flushing thread - this.logFlusher = + this.hlogFlusher = new LogFlusher(this.threadWakeFrequency, this.stopRequested); // Background thread to check for major compactions; needed if region @@ -513,14 +513,14 @@ if (checkFileSystem()) { closeAllRegions(); try { - log.closeAndDelete(); + hlog.closeAndDelete(); } catch (Exception e) { LOG.error("error closing and deleting HLog", e); } try { serverInfo.setStartCode(System.currentTimeMillis()); - log = setupHLog(); - this.logFlusher.setHLog(log); + hlog = setupHLog(); + this.hlogFlusher.setHLog(hlog); } catch (IOException e) { this.abortRequested = true; this.stopRequested.set(true); @@ -620,17 +620,17 @@ // Send interrupts to wake up threads if sleeping so they notice shutdown. // TODO: Should we check they are alive? If OOME could have exited already cacheFlusher.interruptIfNecessary(); - logFlusher.interrupt(); + hlogFlusher.interrupt(); compactSplitThread.interruptIfNecessary(); - logRoller.interruptIfNecessary(); + hlogRoller.interruptIfNecessary(); this.majorCompactionChecker.interrupt(); if (abortRequested) { if (this.fsOk) { // Only try to clean up if the file system is available try { - if (this.log != null) { - this.log.close(); + if (this.hlog != null) { + this.hlog.close(); LOG.info("On abort, closed hlog"); } } catch (Throwable e) { @@ -644,7 +644,7 @@ } else { ArrayList closedRegions = closeAllRegions(); try { - log.closeAndDelete(); + hlog.closeAndDelete(); } catch (Throwable e) { LOG.error("Close and delete failed", RemoteExceptionHandler.checkThrowable(e)); @@ -743,8 +743,8 @@ this.hdfsShutdownThread = suppressHdfsShutdownHook(); this.rootDir = new Path(this.conf.get(HConstants.HBASE_DIR)); - this.log = setupHLog(); - this.logFlusher.setHLog(log); + this.hlog = setupHLog(); + this.hlogFlusher.setHLog(hlog); // Init in here rather than in constructor after thread name has been set this.metrics = new RegionServerMetrics(); startServiceThreads(); @@ -1058,7 +1058,7 @@ "running at " + this.serverInfo.getServerAddress().toString() + " because logdir " + logdir.toString() + " exists"); } - HLog newlog = new HLog(fs, logdir, conf, logRoller); + HLog newlog = new HLog(fs, logdir, conf, hlogRoller); return newlog; } @@ -1127,9 +1127,9 @@ LOG.fatal("Set stop flag in " + t.getName(), e); } }; - Threads.setDaemonThreadRunning(this.logRoller, n + ".logRoller", + Threads.setDaemonThreadRunning(this.hlogRoller, n + ".logRoller", handler); - Threads.setDaemonThreadRunning(this.logFlusher, n + ".logFlusher", + Threads.setDaemonThreadRunning(this.hlogFlusher, n + ".logFlusher", handler); Threads.setDaemonThreadRunning(this.cacheFlusher, n + ".cacheFlusher", handler); @@ -1199,7 +1199,7 @@ } // Verify that all threads are alive if (!(leases.isAlive() && compactSplitThread.isAlive() && - cacheFlusher.isAlive() && logRoller.isAlive() && + cacheFlusher.isAlive() && hlogRoller.isAlive() && workerThread.isAlive() && this.majorCompactionChecker.isAlive())) { // One or more threads are no longer alive - shut down stop(); @@ -1234,7 +1234,7 @@ /** @return the HLog */ HLog getLog() { - return this.log; + return this.hlog; } /** @@ -1270,7 +1270,7 @@ Threads.shutdown(this.workerThread); Threads.shutdown(this.cacheFlusher); Threads.shutdown(this.compactSplitThread); - Threads.shutdown(this.logRoller); + Threads.shutdown(this.hlogRoller); } private boolean getMaster() { @@ -1540,7 +1540,7 @@ } this.lock.writeLock().lock(); try { - this.log.setSequenceNumber(region.getMinSequenceId()); + this.hlog.setSequenceNumber(region.getMinSequenceId()); this.onlineRegions.put(mapKey, region); } finally { this.lock.writeLock().unlock(); @@ -1552,7 +1552,7 @@ protected HRegion instantiateRegion(final HRegionInfo regionInfo) throws IOException { HRegion r = new HRegion(HTableDescriptor.getTableDir(rootDir, regionInfo - .getTableDesc().getName()), this.log, this.fs, conf, regionInfo, + .getTableDesc().getName()), this.hlog, this.fs, conf, regionInfo, this.cacheFlusher); r.initialize(null, new Progressable() { public void progress() { Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegionServer.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegionServer.java?rev=775418&r1=775417&r2=775418&view=diff ============================================================================== --- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegionServer.java (original) +++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegionServer.java Sat May 16 06:10:44 2009 @@ -61,7 +61,7 @@ protected HRegion instantiateRegion(final HRegionInfo regionInfo) throws IOException { HRegion r = new IndexedRegion(HTableDescriptor.getTableDir(super - .getRootDir(), regionInfo.getTableDesc().getName()), super.log, super + .getRootDir(), regionInfo.getTableDesc().getName()), super.hlog, super .getFileSystem(), super.conf, regionInfo, super.getFlushRequester()); r.initialize(null, new Progressable() { public void progress() { Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalHLogManager.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalHLogManager.java?rev=775418&r1=775417&r2=775418&view=diff ============================================================================== --- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalHLogManager.java (original) +++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalHLogManager.java Sat May 16 06:10:44 2009 @@ -100,7 +100,7 @@ HLogEdit logEdit; logEdit = new HLogEdit(transactionId, TransactionalOperation.START); */ - hlog.append(regionInfo, null/*logEdit*/); + hlog.append(regionInfo, null/*logEdit*/, System.currentTimeMillis()); } /** @@ -117,7 +117,7 @@ for (BatchOperation op : update) { // COMMENTED OUT HLogEdit logEdit = new HLogEdit(transactionId, update.getRow(), op, commitTime); - hlog.append(regionInfo, update.getRow(), null /*logEdit*/); + hlog.append(regionInfo, update.getRow(), null /*logEdit*/, System.currentTimeMillis()); } } @@ -130,7 +130,7 @@ logEdit = new HLogEdit(transactionId, HLogEdit.TransactionalOperation.COMMIT); */ - hlog.append(regionInfo, null /*logEdit*/); + hlog.append(regionInfo, null /*logEdit*/, System.currentTimeMillis()); } /** @@ -141,7 +141,7 @@ /*HLogEdit logEdit; logEdit = new HLogEdit(transactionId, HLogEdit.TransactionalOperation.ABORT); */ - hlog.append(regionInfo, null /*logEdit*/); + hlog.append(regionInfo, null /*logEdit*/, System.currentTimeMillis()); } /** Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java?rev=775418&r1=775417&r2=775418&view=diff ============================================================================== --- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java (original) +++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java Sat May 16 06:10:44 2009 @@ -111,7 +111,7 @@ protected HRegion instantiateRegion(final HRegionInfo regionInfo) throws IOException { HRegion r = new TransactionalRegion(HTableDescriptor.getTableDir(super - .getRootDir(), regionInfo.getTableDesc().getName()), super.log, super + .getRootDir(), regionInfo.getTableDesc().getName()), super.hlog, super .getFileSystem(), super.conf, regionInfo, super.getFlushRequester()); r.initialize(null, new Progressable() { public void progress() { Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java?rev=775418&r1=775417&r2=775418&view=diff ============================================================================== --- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java (original) +++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java Sat May 16 06:10:44 2009 @@ -77,7 +77,8 @@ byte [] column = Bytes.toBytes("column:" + Integer.toString(j)); edit.add(new KeyValue(rowName, column, System.currentTimeMillis(), column)); - log.append(Bytes.toBytes(Integer.toString(i)), tableName, edit, false); + log.append(Bytes.toBytes(Integer.toString(i)), tableName, edit, + false, System.currentTimeMillis()); } } log.rollWriter(); @@ -110,7 +111,7 @@ cols.add(new KeyValue(row, Bytes.toBytes("column:" + Integer.toString(i)), timestamp, new byte[] { (byte)(i + '0') })); } - log.append(regionName, tableName, cols, false); + log.append(regionName, tableName, cols, false, System.currentTimeMillis()); long logSeqId = log.startCacheFlush(); log.completeCacheFlush(regionName, tableName, logSeqId); log.close();