hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r775418 - in /hadoop/hbase/trunk: ./ conf/ src/java/org/apache/hadoop/hbase/master/ src/java/org/apache/hadoop/hbase/regionserver/ src/java/org/apache/hadoop/hbase/regionserver/tableindexed/ src/java/org/apache/hadoop/hbase/regionserver/tra...
Date Sat, 16 May 2009 06:10:45 GMT
Author: stack
Date: Sat May 16 06:10:44 2009
New Revision: 775418

URL: http://svn.apache.org/viewvc?rev=775418&view=rev
Log:
HBASE-1394 Uploads sometimes fall to 0 requests/second (Binding up on HLog#append?)

Modified:
    hadoop/hbase/trunk/CHANGES.txt
    hadoop/hbase/trunk/conf/hbase-default.xml
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/HMaster.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogKey.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegionServer.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalHLogManager.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java

Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=775418&r1=775417&r2=775418&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Sat May 16 06:10:44 2009
@@ -249,6 +249,8 @@
    HBASE-1424  have shell print regioninfo and location on first load if
                DEBUG enabled
    HBASE-1008  [performance] The replay of logs on server crash takes way too long
+   HBASE-1394  Uploads sometimes fall to 0 requests/second (Binding up on
+               HLog#append?)
 
   OPTIMIZATIONS
    HBASE-1412  Change values for delete column and column family in KeyValue

Modified: hadoop/hbase/trunk/conf/hbase-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/conf/hbase-default.xml?rev=775418&r1=775417&r2=775418&view=diff
==============================================================================
--- hadoop/hbase/trunk/conf/hbase-default.xml (original)
+++ hadoop/hbase/trunk/conf/hbase-default.xml Sat May 16 06:10:44 2009
@@ -175,14 +175,6 @@
     </description>
   </property>
   <property>
-    <name>hbase.regionserver.maxlogentries</name>
-    <value>100000</value>
-    <description>Rotate the HRegion HLogs when count of entries exceeds this
-    value.  Default: 100,000.  Value is checked by a thread that runs every
-    hbase.server.thread.wakefrequency.
-    </description>
-  </property>
-  <property>
     <name>hbase.regionserver.flushlogentries</name>
     <value>100</value>
     <description>Sync the HLog to the HDFS when it has accumulated this many

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/HMaster.java?rev=775418&r1=775417&r2=775418&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/HMaster.java Sat May 16 06:10:44
2009
@@ -267,7 +267,6 @@
         this.rootdir, this.conf);
       HRegion meta = HRegion.createHRegion(HRegionInfo.FIRST_META_REGIONINFO,
         this.rootdir, this.conf);
-
       // Add first region from the META table to the ROOT region.
       HRegion.addRegionToMETA(root, meta);
       root.close();

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java?rev=775418&r1=775417&r2=775418&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java Sat May 16
06:10:44 2009
@@ -104,7 +104,6 @@
   private final Path dir;
   private final Configuration conf;
   private final LogRollListener listener;
-  private final int maxlogentries;
   private final long optionalFlushInterval;
   private final long blocksize;
   private final int flushlogentries;
@@ -132,11 +131,17 @@
 
   private final AtomicLong logSeqNum = new AtomicLong(0);
 
-  private volatile long filenum = 0;
+  private volatile long filenum = -1;
   private volatile long old_filenum = -1;
   
   private final AtomicInteger numEntries = new AtomicInteger(0);
 
+  // Size of edits written so far. Used figuring when to rotate logs.
+  private final AtomicLong editsSize = new AtomicLong(0);
+
+  // If > than this size, roll the log.
+  private final long logrollsize;
+
   // This lock prevents starting a log roll during a cache flush.
   // synchronized is insufficient because a cache flush spans two method calls.
   private final Lock cacheFlushLock = new ReentrantLock();
@@ -144,7 +149,9 @@
   // We synchronize on updateLock to prevent updates and to prevent a log roll
   // during an update
   private final Object updateLock = new Object();
-  
+
+  private final boolean enabled;
+
   /*
    * If more than this many logs, force flush of oldest region to oldest edit
    * goes to disk.  If too many and we crash, then will take forever replaying.
@@ -182,12 +189,13 @@
     this.dir = dir;
     this.conf = conf;
     this.listener = listener;
-    this.maxlogentries =
-      conf.getInt("hbase.regionserver.maxlogentries", 100000);
     this.flushlogentries =
       conf.getInt("hbase.regionserver.flushlogentries", 100);
     this.blocksize = conf.getLong("hbase.regionserver.hlog.blocksize",
       this.fs.getDefaultBlockSize());
+    // Roll at 95% of block size.
+    float multi = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f);
+    this.logrollsize = (long)(this.blocksize * multi);
     this.optionalFlushInterval =
       conf.getLong("hbase.regionserver.optionallogflushinterval", 10 * 1000);
     this.lastLogFlushTime = System.currentTimeMillis();
@@ -196,15 +204,16 @@
     }
     fs.mkdirs(dir);
     this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 64);
+    this.enabled = conf.getBoolean("hbase.regionserver.hlog.enabled", true);
     LOG.info("HLog configuration: blocksize=" + this.blocksize +
-      ", maxlogentries=" + this.maxlogentries + ", flushlogentries=" +
-      this.flushlogentries + ", optionallogflushinternal=" +
-      this.optionalFlushInterval + "ms");
+      ", rollsize=" + this.logrollsize +
+      ", enabled=" + this.enabled +
+      ", flushlogentries=" + this.flushlogentries +
+      ", optionallogflushinternal=" + this.optionalFlushInterval + "ms");
     rollWriter();
   }
 
   /**
-   * Accessor for tests. Not a part of the public API.
    * @return Current state of the monotonically increasing file id.
    */
   public long getFilenum() {
@@ -212,17 +221,13 @@
   }
 
   /**
-   * Get the compression type for the hlog files.
-   * Commit logs SHOULD NOT be compressed.  You'll lose edits if the compression
-   * record is not complete.  In gzip, record is 32k so you could lose up to
-   * 32k of edits (All of this is moot till we have sync/flush in hdfs but
-   * still...).
+   * Get the compression type for the hlog files
    * @param c Configuration to use.
    * @return the kind of compression to use
    */
   private static CompressionType getCompressionType(final Configuration c) {
-    String name = c.get("hbase.io.seqfile.compression.type");
-    return name == null? CompressionType.NONE: CompressionType.valueOf(name);
+    // Compression makes no sense for commit log.  Always return NONE.
+    return CompressionType.NONE;
   }
 
   /**
@@ -277,23 +282,24 @@
       }
       synchronized (updateLock) {
         // Clean up current writer.
-        Path oldFile = cleanupCurrentWriter();
-        // Create a new one.
-        this.old_filenum = this.filenum;
+        Path oldFile = cleanupCurrentWriter(this.filenum);
+        if (this.filenum >= 0) {
+          this.old_filenum = this.filenum;
+        }
         this.filenum = System.currentTimeMillis();
         Path newPath = computeFilename(this.filenum);
-
         this.writer = SequenceFile.createWriter(this.fs, this.conf, newPath,
           HLogKey.class, KeyValue.class,
           fs.getConf().getInt("io.file.buffer.size", 4096),
           fs.getDefaultReplication(), this.blocksize,
           SequenceFile.CompressionType.NONE, new DefaultCodec(), null,
           new Metadata());
-
         LOG.info((oldFile != null?
-          "Closed " + oldFile + ", entries=" + this.numEntries.get() + ". ": "") +
-          "New log writer: " + FSUtils.getPath(newPath));
-
+            "Roll " + FSUtils.getPath(oldFile) + ", entries=" +
+            this.numEntries.get() +
+            ", calcsize=" + this.editsSize.get() + ", filesize=" +
+            this.fs.getFileStatus(oldFile).getLen() + ". ": "") +
+          "New hlog " + FSUtils.getPath(newPath));
         // Can we delete any of the old log files?
         if (this.outputfiles.size() > 0) {
           if (this.lastSeqWritten.size() <= 0) {
@@ -310,6 +316,7 @@
           }
         }
         this.numEntries.set(0);
+        this.editsSize.set(0);
         updateLock.notifyAll();
       }
     } finally {
@@ -337,7 +344,7 @@
     if (LOG.isDebugEnabled()) {
       // Find region associated with oldest key -- helps debugging.
       oldestRegion = getOldestRegion(oldestOutstandingSeqNum);
-      LOG.debug("Found " + sequenceNumbers.size() + " logs to remove " +
+      LOG.debug("Found " + sequenceNumbers.size() + " hlogs to remove " +
         " out of total " + this.outputfiles.size() + "; " +
         "oldest outstanding seqnum is " + oldestOutstandingSeqNum +
         " from region " + Bytes.toString(oldestRegion));
@@ -351,7 +358,7 @@
     if (countOfLogs > this.maxLogs) {
       regionToFlush = oldestRegion != null?
         oldestRegion: getOldestRegion(oldestOutstandingSeqNum);
-      LOG.info("Too many logs: logs=" + countOfLogs + ", maxlogs=" +
+      LOG.info("Too many hlogs: logs=" + countOfLogs + ", maxlogs=" +
         this.maxLogs + "; forcing flush of region with oldest edits: " +
         Bytes.toString(regionToFlush));
     }
@@ -382,7 +389,8 @@
    * @return Path to current writer or null if none.
    * @throws IOException
    */
-  private Path cleanupCurrentWriter() throws IOException {
+  private Path cleanupCurrentWriter(final long currentfilenum)
+  throws IOException {
     Path oldFile = null;
     if (this.writer != null) {
       // Close the current writer, get a new one.
@@ -393,12 +401,12 @@
         // shut ourselves down to minimize loss.  Alternative is to try and
         // keep going.  See HBASE-930.
         FailedLogCloseException flce =
-          new FailedLogCloseException("#" + this.filenum);
+          new FailedLogCloseException("#" + currentfilenum);
         flce.initCause(e);
         throw e; 
       }
-      oldFile = computeFilename(old_filenum);
-      if (filenum > 0) {
+      if (currentfilenum >= 0) {
+        oldFile = computeFilename(currentfilenum);
         this.outputfiles.put(Long.valueOf(this.logSeqNum.get() - 1), oldFile);
       }
     }
@@ -406,7 +414,7 @@
   }
 
   private void deleteLogFile(final Path p, final Long seqno) throws IOException {
-    LOG.info("removing old log file " + FSUtils.getPath(p) +
+    LOG.info("removing old hlog file " + FSUtils.getPath(p) +
       " whose highest sequence/edit id is " + seqno);
     this.fs.delete(p, true);
   }
@@ -418,6 +426,7 @@
    * @return Path
    */
   public Path computeFilename(final long fn) {
+    if (fn < 0) return null;
     return new Path(dir, HLOG_DATFILE + fn);
   }
 
@@ -442,7 +451,7 @@
       synchronized (updateLock) {
         this.closed = true;
         if (LOG.isDebugEnabled()) {
-          LOG.debug("closing log writer in " + this.dir.toString());
+          LOG.debug("closing hlog writer in " + this.dir.toString());
         }
         this.writer.close();
         updateLock.notifyAll();
@@ -457,11 +466,12 @@
    * 
    * @param regionInfo
    * @param logEdit
+   * @param now
    * @throws IOException
    */
-  public void append(HRegionInfo regionInfo, KeyValue logEdit)
+  public void append(HRegionInfo regionInfo, KeyValue logEdit, final long now)
   throws IOException {
-    this.append(regionInfo, new byte[0], logEdit);
+    this.append(regionInfo, new byte[0], logEdit, now);
   }
 
   /** Append an entry to the log.
@@ -469,9 +479,11 @@
    * @param regionInfo
    * @param row
    * @param logEdit
+   * @param now Time of this edit write.
    * @throws IOException
    */
-  public void append(HRegionInfo regionInfo, byte [] row, KeyValue logEdit)
+  public void append(HRegionInfo regionInfo, byte [] row, KeyValue logEdit,
+    final long now)
   throws IOException {
     if (this.closed) {
       throw new IOException("Cannot append; log is closed");
@@ -485,14 +497,13 @@
       // region being flushed is removed if the sequence number of the flush
       // is greater than or equal to the value in lastSeqWritten.
       this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum));
-      HLogKey logKey = new HLogKey(regionName, tableName, seqNum);
+      HLogKey logKey = new HLogKey(regionName, tableName, seqNum, now);
       boolean sync = regionInfo.isMetaRegion() || regionInfo.isRootRegion();
-      doWrite(logKey, logEdit, sync);
+      doWrite(logKey, logEdit, sync, now);
       this.numEntries.incrementAndGet();
       updateLock.notifyAll();
     }
-
-    if (this.numEntries.get() > this.maxlogentries) {
+    if (this.editsSize.get() > this.logrollsize) {
       if (listener != null) {
         listener.logRollRequested();
       }
@@ -520,10 +531,11 @@
    * @param tableName
    * @param edits
    * @param sync
+   * @param now
    * @throws IOException
    */
   void append(byte [] regionName, byte [] tableName, List<KeyValue> edits,
-    boolean sync)
+    boolean sync, final long now)
   throws IOException {
     if (this.closed) {
       throw new IOException("Cannot append; log is closed");
@@ -537,13 +549,14 @@
       this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum[0]));
       int counter = 0;
       for (KeyValue kv: edits) {
-        HLogKey logKey = new HLogKey(regionName, tableName, seqNum[counter++]);
-        doWrite(logKey, kv, sync);
+        HLogKey logKey =
+          new HLogKey(regionName, tableName, seqNum[counter++], now);
+        doWrite(logKey, kv, sync, now);
         this.numEntries.incrementAndGet();
       }
       updateLock.notifyAll();
     }
-    if (this.numEntries.get() > this.maxlogentries) {
+    if (this.editsSize.get() > this.logrollsize) {
         requestLogRoll();
     }
   }
@@ -558,19 +571,19 @@
     if (!this.closed) {
       long now = System.currentTimeMillis();
       synchronized (updateLock) {
-        if (((now - this.optionalFlushInterval) >
-            this.lastLogFlushTime) && this.unflushedEntries.get() > 0) {
+        if (((now - this.optionalFlushInterval) > this.lastLogFlushTime) &&
+            this.unflushedEntries.get() > 0) {
           try {
             sync();
           } catch (IOException e) {
-            LOG.error("Error flushing HLog", e);
+            LOG.error("Error flushing hlog", e);
           }
         }
       }
       long took = System.currentTimeMillis() - now;
       if (took > 1000) {
         LOG.warn(Thread.currentThread().getName() + " took " + took +
-          "ms optional sync'ing HLog; editcount=" + this.numEntries.get());
+          "ms optional sync'ing hlog; editcount=" + this.numEntries.get());
       }
     }
   }
@@ -581,10 +594,14 @@
     }
   }
   
-  private void doWrite(HLogKey logKey, KeyValue logEdit, boolean sync)
+  private void doWrite(HLogKey logKey, KeyValue logEdit, boolean sync,
+      final long now)
   throws IOException {
+    if (!this.enabled) {
+      return;
+    }
     try {
-      long now = System.currentTimeMillis();
+      this.editsSize.addAndGet(logKey.heapSize() + logEdit.heapSize());
       this.writer.append(logKey, logEdit);
       if (sync || this.unflushedEntries.incrementAndGet() >= flushlogentries) {
         sync();
@@ -592,10 +609,10 @@
       long took = System.currentTimeMillis() - now;
       if (took > 1000) {
         LOG.warn(Thread.currentThread().getName() + " took " + took +
-          "ms appending an edit to HLog; editcount=" + this.numEntries.get());
+          "ms appending an edit to hlog; editcount=" + this.numEntries.get());
       }
     } catch (IOException e) {
-      LOG.fatal("Could not append. Requesting close of log", e);
+      LOG.fatal("Could not append. Requesting close of hlog", e);
       requestLogRoll();
       throw e;
     }
@@ -667,8 +684,8 @@
         return;
       }
       synchronized (updateLock) {
-        this.writer.append(new HLogKey(regionName, tableName, logSeqId),
-          completeCacheFlushLogEdit());
+        this.writer.append(new HLogKey(regionName, tableName, logSeqId,
+          System.currentTimeMillis()), completeCacheFlushLogEdit());
         this.numEntries.incrementAndGet();
         Long seq = this.lastSeqWritten.get(regionName);
         if (seq != null && logSeqId >= seq.longValue()) {
@@ -729,7 +746,7 @@
       // Nothing to do
       return;
     }
-    LOG.info("Splitting " + logfiles.length + " log(s) in " +
+    LOG.info("Splitting " + logfiles.length + " hlog(s) in " +
       srcDir.toString());
     splitLog(rootDir, logfiles, fs, conf);
     try {
@@ -741,7 +758,7 @@
       throw io;
     }
     long endMillis = System.currentTimeMillis();
-    LOG.info("log file splitting completed in " + (endMillis - millis) +
+    LOG.info("hlog file splitting completed in " + (endMillis - millis) +
         " millis for " + srcDir.toString());
   }
   
@@ -762,8 +779,8 @@
     try {
       for (int i = 0; i < logfiles.length; i++) {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Splitting " + (i + 1) + " of " + logfiles.length + ": " +
-            logfiles[i].getPath() + ", length=" + logfiles[i].getLen());
+          LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length +
+            ": " + logfiles[i].getPath() + ", length=" + logfiles[i].getLen());
         }
         // Check for possibly empty file. With appends, currently Hadoop reports
         // a zero length even if the file has been sync'd. Revisit if 
@@ -777,7 +794,7 @@
           try {
             int count = 0;
             while (in.next(key, val)) {
-              byte[] regionName = key.getRegionName();
+              byte [] regionName = key.getRegionName();
               LinkedList<HLogEntry> queue = logEntries.get(regionName);
               if (queue == null) {
                 queue = new LinkedList<HLogEntry>();
@@ -787,7 +804,8 @@
               queue.push(new HLogEntry(val, key));
               count++;
             }
-            LOG.debug("Pushed " + count + " entries");
+            LOG.debug("Pushed " + count + " entries from " +
+              logfiles[i].getPath());
           } catch (IOException e) {
             e = RemoteExceptionHandler.checkIOException(e);
             if (!(e instanceof EOFException)) {
@@ -797,7 +815,7 @@
           }
         } catch (IOException e) {
           if (length <= 0) {
-            LOG.warn("Empty log, continuing: " + logfiles[i]);
+            LOG.warn("Empty hlog, continuing: " + logfiles[i]);
             continue;
           }
           throw e;
@@ -838,7 +856,7 @@
                   Path oldlogfile = null;
                   SequenceFile.Reader old = null;
                   if (fs.exists(logfile)) {
-                    LOG.warn("Old log file " + logfile
+                    LOG.warn("Old hlog file " + logfile
                         + " already exists. Copying existing file to new file");
                     oldlogfile = new Path(logfile.toString() + ".old");
                     fs.rename(logfile, oldlogfile);
@@ -852,7 +870,7 @@
                   // iterate.
                   logWriters.put(key, w);
                   if (LOG.isDebugEnabled()) {
-                    LOG.debug("Creating new log file writer for path "
+                    LOG.debug("Creating new hlog file writer for path "
                         + logfile + " and region " + Bytes.toString(key));
                   }
 
@@ -893,10 +911,10 @@
       // Wait for all threads to terminate
       try {
         for(int i = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS) ; i++) {
-          LOG.debug("Waiting for log writers to terminate, iteration #" + i);
+          LOG.debug("Waiting for hlog writers to terminate, iteration #" + i);
         }
       }catch(InterruptedException ex) {
-        LOG.warn("Log writers were interrupted, possible data loss!");
+        LOG.warn("Hlog writers were interrupted, possible data loss!");
       }
     } finally {
       for (SequenceFile.Writer w : logWriters.values()) {

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogKey.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogKey.java?rev=775418&r1=775417&r2=775418&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogKey.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogKey.java Sat May
16 06:10:44 2009
@@ -19,6 +19,8 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.*;
 
@@ -34,14 +36,18 @@
  * <p>Some Transactional edits (START, COMMIT, ABORT) will not have an
  * associated row.
  */
-public class HLogKey implements WritableComparable<HLogKey> {
+public class HLogKey implements WritableComparable<HLogKey>, HeapSize {
   private byte [] regionName;
   private byte [] tablename;
   private long logSeqNum;
+  // Time at which this edit was written.
+  private long writeTime;
+  private int HEAP_TAX = HeapSize.OBJECT + (2 * HeapSize.BYTE_ARRAY) +
+    (2 * HeapSize.LONG);
 
-  /** Create an empty key useful when deserializing */
+  /** Writable Consructor -- Do not use. */
   public HLogKey() {
-    this(null, null, 0L);
+    this(null, null, 0L, HConstants.LATEST_TIMESTAMP);
   }
   
   /**
@@ -52,12 +58,14 @@
    * @param regionName  - name of region
    * @param tablename   - name of table
    * @param logSeqNum   - log sequence number
+   * @param now Time at which this edit was written.
    */
   public HLogKey(final byte [] regionName, final byte [] tablename,
-      long logSeqNum) {
+      long logSeqNum, final long now) {
     this.regionName = regionName;
     this.tablename = tablename;
     this.logSeqNum = logSeqNum;
+    this.writeTime = now;
   }
 
   //////////////////////////////////////////////////////////////////////////////
@@ -78,7 +86,11 @@
   public long getLogSeqNum() {
     return logSeqNum;
   }
-  
+
+  public long getWriteTime() {
+    return this.writeTime;
+  }
+
   @Override
   public String toString() {
     return Bytes.toString(tablename) + "/" + Bytes.toString(regionName) + "/" +
@@ -100,38 +112,44 @@
   public int hashCode() {
     int result = this.regionName.hashCode();
     result ^= this.logSeqNum;
+    result ^= this.writeTime;
     return result;
   }
 
-  //
-  // Comparable
-  //
-
   public int compareTo(HLogKey o) {
     int result = Bytes.compareTo(this.regionName, o.regionName);
-    if(result == 0) {
+    if (result == 0) {
       if (this.logSeqNum < o.logSeqNum) {
         result = -1;
       } else if (this.logSeqNum > o.logSeqNum) {
         result = 1;
       }
+      if (result == 0) {
+        if (this.writeTime < o.writeTime) {
+          result = -1;
+        } else if (this.writeTime > o.writeTime) {
+          return 1;
+        }
+      }
     }
     return result;
   }
 
-  //
-  // Writable
-  //
-
   public void write(DataOutput out) throws IOException {
     Bytes.writeByteArray(out, this.regionName);
     Bytes.writeByteArray(out, this.tablename);
     out.writeLong(logSeqNum);
+    out.writeLong(this.writeTime);
   }
   
   public void readFields(DataInput in) throws IOException {
     this.regionName = Bytes.readByteArray(in);
     this.tablename = Bytes.readByteArray(in);
     this.logSeqNum = in.readLong();
+    this.writeTime = in.readLong();
+  }
+
+  public long heapSize() {
+    return this.regionName.length + this.tablename.length + HEAP_TAX;
   }
 }
\ No newline at end of file

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=775418&r1=775417&r2=775418&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Sat May
16 06:10:44 2009
@@ -1341,7 +1341,7 @@
           edits.add(kv);
         }
         if (!edits.isEmpty()) {
-          update(edits, writeToWAL);
+          update(edits, writeToWAL, now);
         }
         if (latestTimestampDeletes != null &&
             !latestTimestampDeletes.isEmpty()) {
@@ -1349,7 +1349,7 @@
           // as edits.  Need to do individually after figuring which is latest
           // timestamp to delete.
           for (byte [] column: latestTimestampDeletes) {
-            deleteMultiple(row, column, LATEST_TIMESTAMP, 1);
+            deleteMultiple(row, column, LATEST_TIMESTAMP, 1, now);
           }
         }
       } finally {
@@ -1387,6 +1387,7 @@
     splitsAndClosesLock.readLock().lock();
     try {
       byte[] row = b.getRow();
+      long now = System.currentTimeMillis();
       Integer lid = getLock(lockid,row);
       try {
         NavigableSet<byte []> keySet =
@@ -1404,7 +1405,7 @@
         }
         if (success) {
           long commitTime = (b.getTimestamp() == LATEST_TIMESTAMP)?
-            System.currentTimeMillis(): b.getTimestamp();
+            now: b.getTimestamp();
           Set<byte []> latestTimestampDeletes = null;
           List<KeyValue> edits = new ArrayList<KeyValue>();
           for (BatchOperation op: b) {
@@ -1431,7 +1432,7 @@
             edits.add(kv);
           }
           if (!edits.isEmpty()) {
-            update(edits, writeToWAL);
+            update(edits, writeToWAL, now);
           }
           if (latestTimestampDeletes != null &&
               !latestTimestampDeletes.isEmpty()) {
@@ -1439,7 +1440,7 @@
             // as edits.  Need to do individually after figuring which is latest
             // timestamp to delete.
             for (byte [] column: latestTimestampDeletes) {
-              deleteMultiple(row, column, LATEST_TIMESTAMP, 1);
+              deleteMultiple(row, column, LATEST_TIMESTAMP, 1, now);
             }
           }
         }
@@ -1530,7 +1531,7 @@
     try {
       // Delete ALL versions rather than column family VERSIONS.  If we just did
       // VERSIONS, then if 2* VERSION cells, subsequent gets would get old stuff.
-      deleteMultiple(row, column, ts, ALL_VERSIONS);
+      deleteMultiple(row, column, ts, ALL_VERSIONS, System.currentTimeMillis());
     } finally {
       if(lockid == null) releaseRowLock(lid);
     }
@@ -1547,9 +1548,10 @@
   throws IOException {
     checkReadOnly();
     Integer lid = getLock(lockid, row);
+    long now = System.currentTimeMillis();
     long time = ts;
     if (ts == HConstants.LATEST_TIMESTAMP) {
-      time = System.currentTimeMillis();
+      time = now;
     }
     KeyValue kv = KeyValue.createFirstOnRow(row, time);
     try {
@@ -1561,7 +1563,7 @@
           // This is UGLY. COPY OF KEY PART OF KeyValue.
           edits.add(key.cloneDelete());
         }
-        update(edits);
+        update(edits, now);
       }
     } finally {
       if (lockid == null) releaseRowLock(lid);
@@ -1594,7 +1596,7 @@
         for (KeyValue key: keyvalues) {
           edits.add(key.cloneDelete());
         }
-        update(edits);
+        update(edits, now);
       }
     } finally {
       if(lockid == null) releaseRowLock(lid);
@@ -1629,7 +1631,7 @@
       for (KeyValue kv: keyvalues) {
         edits.add(kv.cloneDelete());
       }
-      update(edits);
+      update(edits, now);
     } finally {
       if(lockid == null) releaseRowLock(lid);
     }
@@ -1668,7 +1670,7 @@
         for (KeyValue k: keyvalues) {
           edits.add(k.cloneDelete());
         }
-        update(edits);
+        update(edits, now);
       }
     } finally {
       if(lockid == null) releaseRowLock(lid);
@@ -1684,10 +1686,11 @@
    * @param ts Timestamp to start search on.
    * @param versions How many versions to delete. Pass
    * {@link HConstants#ALL_VERSIONS} to delete all.
+   * @param now
    * @throws IOException
    */
   private void deleteMultiple(final byte [] row, final byte [] column,
-      final long ts, final int versions)
+      final long ts, final int versions, final long now)
   throws IOException {
     checkReadOnly();
     // We used to have a getKeys method that purportedly only got the keys and
@@ -1704,7 +1707,7 @@
       for (KeyValue key: keys) {
         edits.add(key.cloneDelete());
       }
-      update(edits);
+      update(edits, now);
     }
   }
 
@@ -1748,10 +1751,12 @@
    * Add updates first to the hlog and then add values to memcache.
    * Warning: Assumption is caller has lock on passed in row.
    * @param edits Cell updates by column
+   * @praram now
    * @throws IOException
    */
-  private void update(final List<KeyValue> edits) throws IOException {
-    this.update(edits, true);
+  private void update(final List<KeyValue> edits, final long now)
+  throws IOException {
+    this.update(edits, true, now);
   }
 
   /** 
@@ -1759,9 +1764,11 @@
    * Warning: Assumption is caller has lock on passed in row.
    * @param writeToWAL if true, then we should write to the log
    * @param updatesByColumn Cell updates by column
+   * @param now
    * @throws IOException
    */
-  private void update(final List<KeyValue> edits, boolean writeToWAL)
+  private void update(final List<KeyValue> edits, boolean writeToWAL,
+    final long now)
   throws IOException {
     if (edits == null || edits.isEmpty()) {
       return;
@@ -1772,7 +1779,7 @@
       if (writeToWAL) {
         this.log.append(regionInfo.getRegionName(),
           regionInfo.getTableDesc().getName(), edits,
-          (regionInfo.isMetaRegion() || regionInfo.isRootRegion()));
+          (regionInfo.isMetaRegion() || regionInfo.isRootRegion()), now);
       }
       long size = 0;
       for (KeyValue kv: edits) {
@@ -2273,7 +2280,7 @@
       List<KeyValue> edits = new ArrayList<KeyValue>();
       edits.add(new KeyValue(row, COL_REGIONINFO, System.currentTimeMillis(),
         Writables.getBytes(r.getRegionInfo())));
-      meta.update(edits);
+      meta.update(edits, System.currentTimeMillis());
     } finally {
       meta.releaseRowLock(lid);
     }

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=775418&r1=775417&r2=775418&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Sat
May 16 06:10:44 2009
@@ -205,9 +205,9 @@
 
   // HLog and HLog roller.  log is protected rather than private to avoid
   // eclipse warning when accessed by inner classes
-  protected volatile HLog log;
-  LogRoller logRoller;
-  LogFlusher logFlusher;
+  protected volatile HLog hlog;
+  LogRoller hlogRoller;
+  LogFlusher hlogFlusher;
   
   // limit compactions while starting up
   CompactionLimitThread compactionLimitThread;
@@ -344,10 +344,10 @@
     this.compactSplitThread = new CompactSplitThread(this);
     
     // Log rolling thread
-    this.logRoller = new LogRoller(this);
+    this.hlogRoller = new LogRoller(this);
     
     // Log flushing thread
-    this.logFlusher =
+    this.hlogFlusher =
       new LogFlusher(this.threadWakeFrequency, this.stopRequested);
     
     // Background thread to check for major compactions; needed if region
@@ -513,14 +513,14 @@
                 if (checkFileSystem()) {
                   closeAllRegions();
                   try {
-                    log.closeAndDelete();
+                    hlog.closeAndDelete();
                   } catch (Exception e) {
                     LOG.error("error closing and deleting HLog", e);
                   }
                   try {
                     serverInfo.setStartCode(System.currentTimeMillis());
-                    log = setupHLog();
-                    this.logFlusher.setHLog(log);
+                    hlog = setupHLog();
+                    this.hlogFlusher.setHLog(hlog);
                   } catch (IOException e) {
                     this.abortRequested = true;
                     this.stopRequested.set(true);
@@ -620,17 +620,17 @@
     // Send interrupts to wake up threads if sleeping so they notice shutdown.
     // TODO: Should we check they are alive?  If OOME could have exited already
     cacheFlusher.interruptIfNecessary();
-    logFlusher.interrupt();
+    hlogFlusher.interrupt();
     compactSplitThread.interruptIfNecessary();
-    logRoller.interruptIfNecessary();
+    hlogRoller.interruptIfNecessary();
     this.majorCompactionChecker.interrupt();
 
     if (abortRequested) {
       if (this.fsOk) {
         // Only try to clean up if the file system is available
         try {
-          if (this.log != null) {
-            this.log.close();
+          if (this.hlog != null) {
+            this.hlog.close();
             LOG.info("On abort, closed hlog");
           }
         } catch (Throwable e) {
@@ -644,7 +644,7 @@
     } else {
       ArrayList<HRegion> closedRegions = closeAllRegions();
       try {
-        log.closeAndDelete();
+        hlog.closeAndDelete();
       } catch (Throwable e) {
         LOG.error("Close and delete failed",
           RemoteExceptionHandler.checkThrowable(e));
@@ -743,8 +743,8 @@
       this.hdfsShutdownThread = suppressHdfsShutdownHook();
 
       this.rootDir = new Path(this.conf.get(HConstants.HBASE_DIR));
-      this.log = setupHLog();
-      this.logFlusher.setHLog(log);
+      this.hlog = setupHLog();
+      this.hlogFlusher.setHLog(hlog);
       // Init in here rather than in constructor after thread name has been set
       this.metrics = new RegionServerMetrics();
       startServiceThreads();
@@ -1058,7 +1058,7 @@
         "running at " + this.serverInfo.getServerAddress().toString() +
         " because logdir " + logdir.toString() + " exists");
     }
-    HLog newlog = new HLog(fs, logdir, conf, logRoller);
+    HLog newlog = new HLog(fs, logdir, conf, hlogRoller);
     return newlog;
   }
   
@@ -1127,9 +1127,9 @@
         LOG.fatal("Set stop flag in " + t.getName(), e);
       }
     };
-    Threads.setDaemonThreadRunning(this.logRoller, n + ".logRoller",
+    Threads.setDaemonThreadRunning(this.hlogRoller, n + ".logRoller",
         handler);
-    Threads.setDaemonThreadRunning(this.logFlusher, n + ".logFlusher",
+    Threads.setDaemonThreadRunning(this.hlogFlusher, n + ".logFlusher",
         handler);
     Threads.setDaemonThreadRunning(this.cacheFlusher, n + ".cacheFlusher",
       handler);
@@ -1199,7 +1199,7 @@
     }
     // Verify that all threads are alive
     if (!(leases.isAlive() && compactSplitThread.isAlive() &&
-        cacheFlusher.isAlive() && logRoller.isAlive() &&
+        cacheFlusher.isAlive() && hlogRoller.isAlive() &&
         workerThread.isAlive() && this.majorCompactionChecker.isAlive())) {
       // One or more threads are no longer alive - shut down
       stop();
@@ -1234,7 +1234,7 @@
 
   /** @return the HLog */
   HLog getLog() {
-    return this.log;
+    return this.hlog;
   }
 
   /**
@@ -1270,7 +1270,7 @@
     Threads.shutdown(this.workerThread);
     Threads.shutdown(this.cacheFlusher);
     Threads.shutdown(this.compactSplitThread);
-    Threads.shutdown(this.logRoller);
+    Threads.shutdown(this.hlogRoller);
   }
 
   private boolean getMaster() {
@@ -1540,7 +1540,7 @@
       }
       this.lock.writeLock().lock();
       try {
-        this.log.setSequenceNumber(region.getMinSequenceId());
+        this.hlog.setSequenceNumber(region.getMinSequenceId());
         this.onlineRegions.put(mapKey, region);
       } finally {
         this.lock.writeLock().unlock();
@@ -1552,7 +1552,7 @@
   protected HRegion instantiateRegion(final HRegionInfo regionInfo)
       throws IOException {
     HRegion r = new HRegion(HTableDescriptor.getTableDir(rootDir, regionInfo
-        .getTableDesc().getName()), this.log, this.fs, conf, regionInfo,
+        .getTableDesc().getName()), this.hlog, this.fs, conf, regionInfo,
         this.cacheFlusher);
     r.initialize(null,  new Progressable() {
       public void progress() {

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegionServer.java?rev=775418&r1=775417&r2=775418&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegionServer.java
(original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegionServer.java
Sat May 16 06:10:44 2009
@@ -61,7 +61,7 @@
   protected HRegion instantiateRegion(final HRegionInfo regionInfo)
       throws IOException {
     HRegion r = new IndexedRegion(HTableDescriptor.getTableDir(super
-        .getRootDir(), regionInfo.getTableDesc().getName()), super.log, super
+        .getRootDir(), regionInfo.getTableDesc().getName()), super.hlog, super
         .getFileSystem(), super.conf, regionInfo, super.getFlushRequester());
     r.initialize(null, new Progressable() {
       public void progress() {

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalHLogManager.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalHLogManager.java?rev=775418&r1=775417&r2=775418&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalHLogManager.java
(original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalHLogManager.java
Sat May 16 06:10:44 2009
@@ -100,7 +100,7 @@
     HLogEdit logEdit;
     logEdit = new HLogEdit(transactionId, TransactionalOperation.START);
 */
-    hlog.append(regionInfo, null/*logEdit*/);
+    hlog.append(regionInfo, null/*logEdit*/, System.currentTimeMillis());
   }
 
   /**
@@ -117,7 +117,7 @@
 
     for (BatchOperation op : update) {
       // COMMENTED OUT  HLogEdit logEdit = new HLogEdit(transactionId, update.getRow(), op,
commitTime);
-      hlog.append(regionInfo, update.getRow(), null /*logEdit*/);
+      hlog.append(regionInfo, update.getRow(), null /*logEdit*/, System.currentTimeMillis());
     }
   }
 
@@ -130,7 +130,7 @@
     logEdit = new HLogEdit(transactionId,
         HLogEdit.TransactionalOperation.COMMIT);
 */
-    hlog.append(regionInfo, null /*logEdit*/);
+    hlog.append(regionInfo, null /*logEdit*/, System.currentTimeMillis());
   }
 
   /**
@@ -141,7 +141,7 @@
     /*HLogEdit logEdit;
     logEdit = new HLogEdit(transactionId, HLogEdit.TransactionalOperation.ABORT);
 */
-    hlog.append(regionInfo, null /*logEdit*/);
+    hlog.append(regionInfo, null /*logEdit*/, System.currentTimeMillis());
   }
 
   /**

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java?rev=775418&r1=775417&r2=775418&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java
(original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java
Sat May 16 06:10:44 2009
@@ -111,7 +111,7 @@
   protected HRegion instantiateRegion(final HRegionInfo regionInfo)
       throws IOException {
     HRegion r = new TransactionalRegion(HTableDescriptor.getTableDir(super
-        .getRootDir(), regionInfo.getTableDesc().getName()), super.log, super
+        .getRootDir(), regionInfo.getTableDesc().getName()), super.hlog, super
         .getFileSystem(), super.conf, regionInfo, super.getFlushRequester());
     r.initialize(null, new Progressable() {
       public void progress() {

Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java?rev=775418&r1=775417&r2=775418&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java Sat May
16 06:10:44 2009
@@ -77,7 +77,8 @@
             byte [] column = Bytes.toBytes("column:" + Integer.toString(j));
             edit.add(new KeyValue(rowName, column, System.currentTimeMillis(),
               column));
-            log.append(Bytes.toBytes(Integer.toString(i)), tableName, edit, false);
+            log.append(Bytes.toBytes(Integer.toString(i)), tableName, edit,
+              false, System.currentTimeMillis());
           }
         }
         log.rollWriter();
@@ -110,7 +111,7 @@
         cols.add(new KeyValue(row, Bytes.toBytes("column:" + Integer.toString(i)),
           timestamp, new byte[] { (byte)(i + '0') }));
       }
-      log.append(regionName, tableName, cols, false);
+      log.append(regionName, tableName, cols, false, System.currentTimeMillis());
       long logSeqId = log.startCacheFlush();
       log.completeCacheFlush(regionName, tableName, logSeqId);
       log.close();



Mime
View raw message