hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r583839 - in /lucene/hadoop/trunk/src/contrib/hbase: CHANGES.txt src/java/org/apache/hadoop/hbase/HLog.java src/test/org/apache/hadoop/hbase/TestLogRolling.java
Date Thu, 11 Oct 2007 14:45:36 GMT
Author: stack
Date: Thu Oct 11 07:45:36 2007
New Revision: 583839

URL: http://svn.apache.org/viewvc?rev=583839&view=rev
Log:
HADOOP-2029 TestLogRolling fails too often in patch and nightlies

Modified:
    lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java

Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?rev=583839&r1=583838&r2=583839&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Thu Oct 11 07:45:36 2007
@@ -74,6 +74,7 @@
                 daemon scripts
     HADOOP-2017 TestRegionServerAbort failure in patch build #903 and
                 nightly #266
+    HADOOP-2029 TestLogRolling fails too often in patch and nightlies
 
   IMPROVEMENTS
     HADOOP-1737 Make HColumnDescriptor data publically members settable

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java?rev=583839&r1=583838&r2=583839&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java Thu Oct
11 07:45:36 2007
@@ -83,26 +83,28 @@
  */
 public class HLog implements HConstants {
   private static final Log LOG = LogFactory.getLog(HLog.class);
-
-  static final String HLOG_DATFILE = "hlog.dat.";
-
+  private static final String HLOG_DATFILE = "hlog.dat.";
   static final Text METACOLUMN = new Text("METACOLUMN:");
-
   static final Text METAROW = new Text("METAROW");
-
-  FileSystem fs;
-
-  Path dir;
-
-  Configuration conf;
-
+  final FileSystem fs;
+  final Path dir;
+  final Configuration conf;
   final long threadWakeFrequency;
 
+  /*
+   * Current log file.
+   */
   SequenceFile.Writer writer;
 
-  TreeMap<Long, Path> outputfiles = new TreeMap<Long, Path>();
+  /*
+   * Map of all log files but the current one. 
+   */
+  final TreeMap<Long, Path> outputfiles = new TreeMap<Long, Path>();
 
-  HashMap<Text, Long> lastSeqWritten = new HashMap<Text, Long>();
+  /*
+   * Map of region to last sequence/edit id. 
+   */
+  final Map<Text, Long> lastSeqWritten = new HashMap<Text, Long>();
 
   volatile boolean closed = false;
 
@@ -129,11 +131,12 @@
    * @throws IOException
    */
   static void splitLog(Path rootDir, Path srcDir, FileSystem fs,
-      Configuration conf) throws IOException {
+    Configuration conf)
+  throws IOException {
     Path logfiles[] = fs.listPaths(new Path[] { srcDir });
     LOG.info("splitting " + logfiles.length + " log(s) in " +
       srcDir.toString());
-    HashMap<Text, SequenceFile.Writer> logWriters =
+    Map<Text, SequenceFile.Writer> logWriters =
       new HashMap<Text, SequenceFile.Writer>();
     try {
       for (int i = 0; i < logfiles.length; i++) {
@@ -156,12 +159,12 @@
             SequenceFile.Writer w = logWriters.get(regionName);
             if (w == null) {
               Path logfile = new Path(HRegion.getRegionDir(rootDir,
-                  regionName), HREGION_OLDLOGFILE_NAME);
+                regionName), HREGION_OLDLOGFILE_NAME);
               if (LOG.isDebugEnabled()) {
                 LOG.debug("getting new log file writer for path " + logfile);
               }
               w = SequenceFile.createWriter(fs, conf, logfile, HLogKey.class,
-                  HLogEdit.class);
+                HLogEdit.class);
               logWriters.put(regionName, w);
             }
             if (LOG.isDebugEnabled()) {
@@ -202,12 +205,12 @@
    * @param conf
    * @throws IOException
    */
-  HLog(FileSystem fs, Path dir, Configuration conf) throws IOException {
+  HLog(final FileSystem fs, final Path dir, final Configuration conf)
+  throws IOException {
     this.fs = fs;
     this.dir = dir;
     this.conf = conf;
     this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
-
     if (fs.exists(dir)) {
       throw new IOException("Target HLog directory already exists: " + dir);
     }
@@ -242,7 +245,7 @@
    * flush cannot start when the log is being rolled and the log cannot be
    * rolled during a cache flush.
    *
-   * Note that this method cannot be synchronized because it is possible that
+   * <p>Note that this method cannot be synchronized because it is possible that
    * startCacheFlush runs, obtaining the cacheFlushLock, then this method could
    * start which would obtain the lock on this but block on obtaining the
    * cacheFlushLock and then completeCacheFlush could be called which would wait
@@ -253,81 +256,94 @@
   synchronized void rollWriter() throws IOException {
     boolean locked = false;
     while (!locked && !closed) {
-      if (cacheFlushLock.tryLock()) {
+      if (this.cacheFlushLock.tryLock()) {
         locked = true;
         break;
       }
       try {
         this.wait(threadWakeFrequency);
       } catch (InterruptedException e) {
+        // continue
       }
     }
     if (closed) {
       if (locked) {
-        cacheFlushLock.unlock();
+        this.cacheFlushLock.unlock();
       }
       throw new IOException("Cannot roll log; log is closed");
     }
 
     // If we get here we have locked out both cache flushes and appends
-
     try {
-      if (writer != null) {
+      if (this.writer != null) {
         // Close the current writer, get a new one.
-        writer.close();
+        this.writer.close();
         Path p = computeFilename(filenum - 1);
         if (LOG.isDebugEnabled()) {
           LOG.debug("Closing current log writer " + p.toString() +
-              " to get a new one");
+            " to get a new one");
         }
         if (filenum > 0) {
-          synchronized (sequenceLock) {
-            outputfiles.put(logSeqNum - 1, p);
+          synchronized (this.sequenceLock) {
+            this.outputfiles.put(Long.valueOf(this.logSeqNum - 1), p);
           }
         }
       }
       Path newPath = computeFilename(filenum++);
-      this.writer = SequenceFile.createWriter(fs, conf, newPath,
+      this.writer = SequenceFile.createWriter(this.fs, this.conf, newPath,
           HLogKey.class, HLogEdit.class);
-
       LOG.info("new log writer created at " + newPath);
 
       // Can we delete any of the old log files?
-
-      TreeSet<Long> sequenceNumbers =
-        new TreeSet<Long>(lastSeqWritten.values());
-
-      if (sequenceNumbers.size() > 0) {
-        long oldestOutstandingSeqNum = sequenceNumbers.first();
-
-        // Get the set of all log files whose final ID is older than the oldest
-        // pending region operation
-
-        sequenceNumbers.clear();
-        sequenceNumbers.addAll(outputfiles.headMap(
-            oldestOutstandingSeqNum).keySet());
-
-        // Now remove old log files (if any)
-
-        for (Long seq : sequenceNumbers) {
-          Path p = outputfiles.remove(seq);
-          LOG.info("removing old log file " + p.toString());
-          fs.delete(p);
+      if (this.outputfiles.size() > 0) {
+        if (this.lastSeqWritten.size() <= 0) {
+          LOG.debug("Last sequence written is empty. Deleting all old hlogs");
+          // If so, then no new writes have come in since all regions were
+          // flushed (and removed from the lastSeqWritten map). Means can
+          // remove all but currently open log file.
+          for (Map.Entry<Long, Path> e : this.outputfiles.entrySet()) {
+            deleteLogFile(e.getValue(), e.getKey());
+          }
+          this.outputfiles.clear();
+        } else {
+          // Get oldest edit/sequence id.  If logs are older than this id,
+          // then safe to remove.
+          TreeSet<Long> sequenceNumbers =
+            new TreeSet<Long>(this.lastSeqWritten.values());
+          long oldestOutstandingSeqNum = sequenceNumbers.first().longValue();
+          // Get the set of all log files whose final ID is older than the
+          // oldest pending region operation
+          sequenceNumbers.clear();
+          sequenceNumbers.addAll(this.outputfiles.headMap(
+              Long.valueOf(oldestOutstandingSeqNum)).keySet());
+          // Now remove old log files (if any)
+          LOG.debug("Found " + sequenceNumbers.size() + " logs to remove " +
+            "using oldest outstanding seqnum of " + oldestOutstandingSeqNum);
+          for (Long seq : sequenceNumbers) {
+            deleteLogFile(this.outputfiles.remove(seq), seq);
+          }
         }
       }
       this.numEntries = 0;
-
     } finally {
-      cacheFlushLock.unlock();
+      this.cacheFlushLock.unlock();
     }
   }
+  
+  private void deleteLogFile(final Path p, final Long seqno)
+  throws IOException {
+    LOG.info("removing old log file " + p.toString() +
+      " whose highest sequence/edit id is " + seqno);
+    this.fs.delete(p);
+  }
 
   /**
    * This is a convenience method that computes a new filename with a given
    * file-number.
    */
   Path computeFilename(final long fn) {
-    return new Path(dir, HLOG_DATFILE + String.format("%1$03d", fn));
+    return new Path(dir,
+      HLOG_DATFILE + String.format("%1$03d", Long.valueOf(fn)));
   }
 
   /**
@@ -378,27 +394,26 @@
    * @throws IOException
    */
   synchronized void append(Text regionName, Text tableName, Text row,
-      TreeMap<Text, byte[]> columns, long timestamp) throws IOException {
+      TreeMap<Text, byte[]> columns, long timestamp)
+  throws IOException {
     if (closed) {
       throw new IOException("Cannot append; log is closed");
     }
-
     long seqNum[] = obtainSeqNum(columns.size());
-
-    // The 'lastSeqWritten' map holds the sequence number of the most recent
+    // The 'lastSeqWritten' map holds the sequence number of the oldest
     // write for each region. When the cache is flushed, the entry for the
     // region being flushed is removed if the sequence number of the flush
-    // is greater than or equal to the value in lastSeqWritten
-
-    lastSeqWritten.put(regionName, seqNum[seqNum.length - 1]);
-
+    // is greater than or equal to the value in lastSeqWritten.
+    if (!this.lastSeqWritten.containsKey(regionName)) {
+      this.lastSeqWritten.put(regionName, Long.valueOf(seqNum[0]));
+    }
     int counter = 0;
     for (Map.Entry<Text, byte[]> es : columns.entrySet()) {
       HLogKey logKey =
         new HLogKey(regionName, tableName, row, seqNum[counter++]);
       HLogEdit logEdit = new HLogEdit(es.getKey(), es.getValue(), timestamp);
-      writer.append(logKey, logEdit);
-      numEntries++;
+      this.writer.append(logKey, logEdit);
+      this.numEntries++;
     }
   }
 
@@ -426,9 +441,9 @@
    */
   private long[] obtainSeqNum(int num) {
     long[] results = new long[num];
-    synchronized (sequenceLock) {
+    synchronized (this.sequenceLock) {
       for (int i = 0; i < num; i++) {
-        results[i] = logSeqNum++;
+        results[i] = this.logSeqNum++;
       }
     }
     return results;
@@ -447,7 +462,7 @@
    * @see #abortCacheFlush()
    */
   long startCacheFlush() {
-    cacheFlushLock.lock();
+    this.cacheFlushLock.lock();
     return obtainSeqNum();
   }
 
@@ -462,25 +477,22 @@
    * @throws IOException
    */
   synchronized void completeCacheFlush(final Text regionName,
-      final Text tableName, final long logSeqId) throws IOException {
-
+    final Text tableName, final long logSeqId)
+  throws IOException {
     try {
       if (this.closed) {
         return;
       }
-
-      writer.append(new HLogKey(regionName, tableName, HLog.METAROW, logSeqId),
-          new HLogEdit(HLog.METACOLUMN, HGlobals.completeCacheFlush.get(),
-              System.currentTimeMillis()));
-
-      numEntries++;
-      Long seq = lastSeqWritten.get(regionName);
-      if (seq != null && logSeqId >= seq) {
-        lastSeqWritten.remove(regionName);
+      this.writer.append(new HLogKey(regionName, tableName, HLog.METAROW, logSeqId),
+        new HLogEdit(HLog.METACOLUMN, HGlobals.completeCacheFlush.get(),
+          System.currentTimeMillis()));
+      this.numEntries++;
+      Long seq = this.lastSeqWritten.get(regionName);
+      if (seq != null && logSeqId >= seq.longValue()) {
+        this.lastSeqWritten.remove(regionName);
       }
-
     } finally {
-      cacheFlushLock.unlock();
+      this.cacheFlushLock.unlock();
       notifyAll();              // wake up the log roller if it is waiting
     }
   }

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java?rev=583839&r1=583838&r2=583839&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java
(original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java
Thu Oct 11 07:45:36 2007
@@ -19,8 +19,6 @@
  */
 package org.apache.hadoop.hbase;
 
-import java.io.IOException;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.dfs.MiniDFSCluster;
@@ -128,9 +126,10 @@
     try {
       Thread.sleep(10 * 1000);                  // Wait for region server to start
     } catch (InterruptedException e) {
+      // continue
     }
 
-    logdir = cluster.regionThreads.get(0).getRegionServer().getLog().dir;
+    this.logdir = cluster.regionThreads.get(0).getRegionServer().getLog().dir;
     
     // When the META table can be opened, the region servers are running
     @SuppressWarnings("unused")
@@ -155,13 +154,14 @@
         try {
           Thread.sleep(2000);
         } catch (InterruptedException e) {
+          // continue
         }
       }
     }
   }
   
-  private int countLogFiles(boolean print) throws IOException {
-    Path[] logfiles = dfs.getFileSystem().listPaths(new Path[] {logdir});
+  private int countLogFiles(final boolean print) throws Exception {
+    Path[] logfiles = dfs.getFileSystem().listPaths(new Path[] {this.logdir});
     if (print) {
       for (int i = 0; i < logfiles.length; i++) {
         if (LOG.isDebugEnabled()) {
@@ -186,15 +186,18 @@
     conf.setLong("hbase.hregion.max.filesize", 768L * 1024L);
     try {
       startAndWriteData();
-      LOG.info("Finished writing. Sleeping to let cache flusher and log roller run");
-      try {
-        // Wait for log roller and cache flusher to run a few times...
-        Thread.sleep(30L * 1000L);
-      } catch (InterruptedException e) {
-        LOG.info("Sleep interrupted", e);
+      int count = countLogFiles(true);
+      LOG.info("Finished writing. There are " + count + " log files. " +
+        "Sleeping to let cache flusher and log roller run");
+      while (count > 2) {
+        try {
+          Thread.sleep(1000L);
+        } catch (InterruptedException e) {
+          LOG.info("Sleep interrupted", e);
+        }
+        count = countLogFiles(true);
       }
-      LOG.info("Wake from sleep");
-      assertTrue(countLogFiles(true) <= 2);
+      assertTrue(count <= 2);
     } catch (Exception e) {
       LOG.fatal("unexpected exception", e);
       throw e;



Mime
View raw message