hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject svn commit: r579353 - in /lucene/hadoop/trunk/src/contrib/hbase: CHANGES.txt src/java/org/apache/hadoop/hbase/HLog.java src/java/org/apache/hadoop/hbase/HRegion.java src/test/org/apache/hadoop/hbase/TestLogRolling.java
Date Tue, 25 Sep 2007 19:13:51 GMT
Author: jimk
Date: Tue Sep 25 12:13:50 2007
New Revision: 579353

URL: http://svn.apache.org/viewvc?rev=579353&view=rev
Log:
HADOOP-1943 LogRolling test fails: reverting changes for HADOOP-1820

Removed:
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java
Modified:
    lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java

Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?rev=579353&r1=579352&r2=579353&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Tue Sep 25 12:13:50 2007
@@ -45,6 +45,7 @@
     HADOOP-1813 OOME makes zombie of region server
     HADOOP-1814	TestCleanRegionServerExit fails too often on Hudson
     HADOOP-1820 Regionserver creates hlogs without bound
+                (reverted 2007/09/25)
     HADOOP-1821 Replace all String.getBytes() with String.getBytes("UTF-8")
     HADOOP-1832 listTables() returns duplicate tables
     HADOOP-1834 Scanners ignore timestamp passed on creation

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java?rev=579353&r1=579352&r2=579353&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java Tue Sep
25 12:13:50 2007
@@ -29,9 +29,6 @@
 import java.io.*;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * HLog stores all the edits to the HStore.
@@ -56,11 +53,6 @@
  * older (smaller) than the most-recent CACHEFLUSH message for every HRegion 
  * that has a message in F.
  * 
- * <p>synchronized methods can never execute in parallel. However, between the
- * start of a cache flush and the completion point, appends are allowed but log
- * rolling is not. To prevent log rolling taking place during this period, a
- * separate reentrant lock is used.
- * 
  * <p>TODO: Vuk Ercegovac also pointed out that keeping HBase HRegion edit logs
  * in HDFS is currently flawed. HBase writes edits to logs and to a memcache.
  * The 'atomic' write to the log is meant to serve as insurance against
@@ -82,21 +74,20 @@
 
   SequenceFile.Writer writer;
   TreeMap<Long, Path> outputfiles = new TreeMap<Long, Path>();
-  HashMap<Text, Long> lastSeqWritten = new HashMap<Text, Long>();
+  volatile boolean insideCacheFlush = false;
+
+  TreeMap<Text, Long> regionToLastFlush = new TreeMap<Text, Long>();
 
   volatile boolean closed = false;
-  AtomicLong logSeqNum = new AtomicLong(0);
-  volatile long filenum = 0;
+  volatile long logSeqNum = 0;
+  long filenum = 0;
   AtomicInteger numEntries = new AtomicInteger(0);
 
-  // This lock prevents starting a log roll during a cache flush.
-  // synchronized is insufficient because a cache flush spans two method calls.
-  private final Lock cacheFlushLock = new ReentrantLock();
+  Integer rollLock = new Integer(0);
 
   /**
    * Split up a bunch of log files, that are no longer being written to,
-   * into new files, one per region.  Delete the old log files when finished.
-   * 
+   * into new files, one per region.  Delete the old log files when ready.
    * @param rootDir Root directory of the HBase instance
    * @param srcDir Directory of log files to split:
    * e.g. <code>${ROOTDIR}/log_HOST_PORT</code>
@@ -189,105 +180,109 @@
     fs.mkdirs(dir);
     rollWriter();
   }
-
-  /**
-   * Called by HRegionServer when it opens a new region to ensure that log
-   * sequence numbers are always greater than the latest sequence number of
-   * the region being brought on-line.
-   * 
-   * @param newvalue
-   */
+  
   synchronized void setSequenceNumber(long newvalue) {
-    if (newvalue > logSeqNum.get()) {
+    if (newvalue > logSeqNum) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("changing sequence number from " + logSeqNum + " to " +
             newvalue);
       }
-      logSeqNum.set(newvalue);
+      logSeqNum = newvalue;
     }
   }
 
   /**
    * Roll the log writer.  That is, start writing log messages to a new file.
-   * 
-   * Because a log cannot be rolled during a cache flush, and a cache flush
-   * spans two method calls, a special lock needs to be obtained so that a
-   * cache flush cannot start when the log is being rolled and the log cannot
-   * be rolled during a cache flush.
-   * 
-   * Note that this method cannot be synchronized because it is possible that
-   * startCacheFlush runs, obtaining the cacheFlushLock, then this method could
-   * start which would obtain the lock on this but block on obtaining the 
-   * cacheFlushLock and then completeCacheFlush could be called which would 
-   * wait for the lock on this and consequently never release the cacheFlushLock
+   *
+   * The 'rollLock' prevents us from entering rollWriter() more than
+   * once at a time.
+   *
+   * The 'this' lock limits access to the current writer so
+   * we don't append multiple items simultaneously.
    * 
    * @throws IOException
    */
   void rollWriter() throws IOException {
-    if(closed) {
-      throw new IOException("Cannot roll log; log is closed");
-    }
+    synchronized(rollLock) {
 
-    cacheFlushLock.lock();                              // prevent cache flushes
-    try {
-      // Now that we have locked out cache flushes, lock this to prevent other
-      // changes.
+      // Try to roll the writer to a new file.  We may have to
+      // wait for a cache-flush to complete.  In the process,
+      // compute a list of old log files that can be deleted.
+
+      Vector<Path> toDeleteList = new Vector<Path>();
+      synchronized(this) {
+        if(closed) {
+          throw new IOException("Cannot roll log; log is closed");
+        }
 
-      synchronized (this) {
-        if (writer != null) { // Close the current writer (if any), get a new one.
+        // Make sure we do not roll the log while inside a
+        // cache-flush.  Otherwise, the log sequence number for
+        // the CACHEFLUSH operation will appear in a "newer" log file
+        // than it should.
+        while(insideCacheFlush) {
+          try {
+            wait();
+          } catch (InterruptedException ie) {
+            // continue;
+          }
+        }
+
+        // Close the current writer (if any), and grab a new one.
+        if(writer != null) {
           writer.close();
           Path p = computeFilename(filenum - 1);
           if(LOG.isDebugEnabled()) {
             LOG.debug("Closing current log writer " + p.toString() +
-            " to get a new one");
+              " to get a new one");
           }
           if (filenum > 0) {
-            outputfiles.put(logSeqNum.get() - 1, p);
+            outputfiles.put(logSeqNum - 1, p);
           }
         }
         Path newPath = computeFilename(filenum++);
-        this.writer = SequenceFile.createWriter(fs, conf, newPath, HLogKey.class,
-            HLogEdit.class);
-
-        if (LOG.isDebugEnabled()) {
+        this.writer = SequenceFile.createWriter(fs, conf, newPath,
+          HLogKey.class, HLogEdit.class);
+        if(LOG.isDebugEnabled()) {
           LOG.debug("new log writer created at " + newPath);
         }
-
+        
         // Can we delete any of the old log files?
         // First, compute the oldest relevant log operation 
         // over all the regions.
 
         long oldestOutstandingSeqNum = Long.MAX_VALUE;
-        for (Long l: lastSeqWritten.values()) {
+        for(Long l: regionToLastFlush.values()) {
           long curSeqNum = l.longValue();
-
-          if (curSeqNum < oldestOutstandingSeqNum) {
+          
+          if(curSeqNum < oldestOutstandingSeqNum) {
             oldestOutstandingSeqNum = curSeqNum;
           }
         }
 
-        // Get the set of all sequence numbers that are older than the oldest
-        // pending region operation
-
-        TreeSet<Long> sequenceNumbers = new TreeSet<Long>();
-        sequenceNumbers.addAll(
-            outputfiles.headMap(oldestOutstandingSeqNum).keySet());
-        
-        // Remove all files with a final ID that's older than the oldest
-        // pending region-operation.
-
-        for (Long seq: sequenceNumbers) {
-          Path p = outputfiles.remove(seq);
-          if(LOG.isDebugEnabled()) {
-            LOG.debug("removing old log file " + p.toString());
+        // Next, remove all files with a final ID that's older
+        // than the oldest pending region-operation.
+        for(Iterator<Long> it = outputfiles.keySet().iterator(); it.hasNext();) {
+          long maxSeqNum = it.next().longValue();
+          if(maxSeqNum < oldestOutstandingSeqNum) {
+            Path p = outputfiles.get(maxSeqNum);
+            it.remove();
+            toDeleteList.add(p);
+            
+          } else {
+            break;
           }
-          fs.delete(p);
         }
-        this.numEntries.set(0);
       }
-      
-    } finally {
-      cacheFlushLock.unlock();
+
+      // Actually delete them, if any!
+      for(Iterator<Path> it = toDeleteList.iterator(); it.hasNext(); ) {
+        Path p = it.next();
+        if(LOG.isDebugEnabled()) {
+          LOG.debug("removing old log file " + p.toString());
+        }
+        fs.delete(p);
+      }
+      this.numEntries.set(0);
     }
   }
 
@@ -333,9 +328,7 @@
    * other systems should process the log appropriately upon each startup
    * (and prior to initializing HLog).
    *
-   * synchronized prevents appends during the completion of a cache flush or
-   * for the duration of a log roll.
-   * 
+   * We need to seize a lock on the writer so that writes are atomic.
    * @param regionName
    * @param tableName
    * @param row
@@ -344,19 +337,21 @@
    * @throws IOException
    */
   synchronized void append(Text regionName, Text tableName, Text row,
-      TreeMap<Text, byte []> columns, long timestamp) throws IOException {
+      TreeMap<Text, byte []> columns, long timestamp)
+  throws IOException {
     if(closed) {
       throw new IOException("Cannot append; log is closed");
     }
-
-    long seqNum[] = obtainSeqNum(columns.size());
     
-    // The 'lastSeqWritten' map holds the sequence number of the most recent
-    // write for each region. When the cache is flushed, the entry for the 
-    // region being flushed is removed if the sequence number of the flush
-    // is greater than or equal to the value in lastSeqWritten
-    
-    lastSeqWritten.put(regionName, seqNum[seqNum.length - 1]);
+    long seqNum[] = obtainSeqNum(columns.size());
+
+    // The 'regionToLastFlush' map holds the sequence id of the
+    // most recent flush for every regionName.  However, for regions
+    // that don't have any flush yet, the relevant operation is the
+    // first one that's been added.
+    if (regionToLastFlush.get(regionName) == null) {
+      regionToLastFlush.put(regionName, seqNum[0]);
+    }
 
     int counter = 0;
     for (Map.Entry<Text, byte []> es: columns.entrySet()) {
@@ -368,39 +363,29 @@
     }
   }
 
-  /**
-   * @return How many items have been added to the log
-   * 
-   * Because numEntries is an AtomicInteger, no locking is required.
-   */
+  /** @return How many items have been added to the log */
   int getNumEntries() {
     return numEntries.get();
   }
 
   /**
-   * Obtain a log sequence number.
-   * 
-   * Because it is only called from a synchronized method, no additional locking
-   * is required.
+   * Obtain a log sequence number.  This seizes the whole HLog
+   * lock, but it shouldn't last too long.
    */
-  private long obtainSeqNum() {
-    return logSeqNum.getAndIncrement();
+  synchronized long obtainSeqNum() {
+    return logSeqNum++;
   }
   
   /**
    * Obtain a specified number of sequence numbers
    * 
-   * Because it is only called from a synchronized method, no additional locking
-   * is required.
-   * 
    * @param num - number of sequence numbers to obtain
    * @return - array of sequence numbers
    */
-  private long[] obtainSeqNum(int num) {
-    long sequenceNumber = logSeqNum.getAndAdd(num);
+  synchronized long[] obtainSeqNum(int num) {
     long[] results = new long[num];
     for (int i = 0; i < num; i++) {
-      results[i] = sequenceNumber++;
+      results[i] = logSeqNum++;
     }
     return results;
   }
@@ -409,50 +394,54 @@
    * By acquiring a log sequence ID, we can allow log messages
    * to continue while we flush the cache.
    *
-   * Acquire a lock so that we do not roll the log between the start
-   * and completion of a cache-flush.  Otherwise the log-seq-id for
+   * Set a flag so that we do not roll the log between the start
+   * and complete of a cache-flush.  Otherwise the log-seq-id for
    * the flush will not appear in the correct logfile.
-   * 
    * @return sequence ID to pass {@link #completeCacheFlush(Text, Text, long)}
    * @see #completeCacheFlush(Text, Text, long)
    * @see #abortCacheFlush()
    */
   synchronized long startCacheFlush() {
-    cacheFlushLock.lock();
+    while (this.insideCacheFlush) {
+      try {
+        wait();
+      } catch (InterruptedException ie) {
+        // continue
+      }
+    }
+    this.insideCacheFlush = true;
+    notifyAll();
     return obtainSeqNum();
   }
 
-  /** 
-   * Complete the cache flush
-   * 
-   * Protected by this.lock()
-   * 
+  /** Complete the cache flush
    * @param regionName
    * @param tableName
    * @param logSeqId
    * @throws IOException
    */
   synchronized void completeCacheFlush(final Text regionName,
-    final Text tableName, final long logSeqId) throws IOException {
-
-    try {
-      if(this.closed) {
-        return;
-      }
-      
-      writer.append(new HLogKey(regionName, tableName, HLog.METAROW, logSeqId),
-          new HLogEdit(HLog.METACOLUMN, HGlobals.completeCacheFlush.get(),
-              System.currentTimeMillis()));
-      
-      numEntries.getAndIncrement();
-      Long seq = lastSeqWritten.get(regionName);
-      if (seq != null && logSeqId >= seq) {
-        lastSeqWritten.remove(regionName);
-      }
-
-    } finally {
-      cacheFlushLock.unlock();
+    final Text tableName, final long logSeqId)
+  throws IOException {
+    if(this.closed) {
+      return;
     }
+    
+    if (!this.insideCacheFlush) {
+      throw new IOException("Impossible situation: inside " +
+        "completeCacheFlush(), but 'insideCacheFlush' flag is false");
+    }
+    HLogKey key = new HLogKey(regionName, tableName, HLog.METAROW, logSeqId);
+    this.writer.append(key,
+      new HLogEdit(HLog.METACOLUMN, HGlobals.completeCacheFlush.get(),
+        System.currentTimeMillis()));
+    this.numEntries.getAndIncrement();
+
+    // Remember the most-recent flush for each region.
+    // This is used to delete obsolete log files.
+    this.regionToLastFlush.put(regionName, Long.valueOf(logSeqId));
+
+    cleanup();
   }
   
   /**
@@ -462,8 +451,23 @@
    * is a restart of the regionserver so the snapshot content dropped by the
    * failure gets restored to the  memcache.
    */
-  void abortCacheFlush() {
-    this.cacheFlushLock.unlock();
+  synchronized void abortCacheFlush() {
+    cleanup();
+  }
+  
+  private synchronized void cleanup() {
+    this.insideCacheFlush = false;
+    notifyAll();
+  }
+  
+  /**
+   * Abort a cache flush.
+   * This method will clear waits on {@link #insideCacheFlush} but if this
+   * method is called, we are losing data.  TODO: Fix.
+   */
+  synchronized void abort() {
+    this.insideCacheFlush = false;
+    notifyAll();
   }
 
   private static void usage() {

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java?rev=579353&r1=579352&r2=579353&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java Tue
Sep 25 12:13:50 2007
@@ -210,7 +210,6 @@
   final int memcacheFlushSize;
   final int blockingMemcacheSize;
   protected final long threadWakeFrequency;
-  protected final int optionalFlushCount;
   private final HLocking lock = new HLocking();
   private long desiredMaxFileSize;
   private final long maxSequenceId;
@@ -248,8 +247,6 @@
     this.regionInfo = regionInfo;
     this.memcache = new HMemcache();
     this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
-    this.optionalFlushCount =
-      conf.getInt("hbase.hregion.memcache.optionalflushcount", 10);
 
     // Declare the regionName.  This is a unique string for the region, used to 
     // build a unique filename.
@@ -731,13 +728,11 @@
   void optionallyFlush() throws IOException {
     if(this.memcache.getSize() > this.memcacheFlushSize) {
       flushcache(false);
-    } else if (this.memcache.getSize() > 0) {
-      if (this.noFlushCount >= this.optionalFlushCount) {
-        LOG.info("Optional flush called " + this.noFlushCount +
-            " times when data present without flushing.  Forcing one.");
-        flushcache(false);
-        
-      } else {
+    } else if (this.memcache.getSize() > 0 && this.noFlushCount >= 10) {
+      LOG.info("Optional flush called " + this.noFlushCount +
+        " times when data present without flushing.  Forcing one.");
+      flushcache(false);
+      if (this.memcache.getSize() > 0) {
         // Only increment if something in the cache.
         // Gets zero'd when a flushcache is called.
         this.noFlushCount++;
@@ -869,31 +864,25 @@
             retval.memcacheSnapshot.size());
       }
 
-      try {
-        // A.  Flush memcache to all the HStores.
-        // Keep running vector of all store files that includes both old and the
-        // just-made new flush store file.
-        for (HStore hstore: stores.values()) {
-          hstore.flushCache(retval.memcacheSnapshot, retval.sequenceId);
-        }
-      } catch (IOException e) {
-        // An exception here means that the snapshot was not persisted.
-        // The hlog needs to be replayed so its content is restored to memcache.
-        // Currently, only a server restart will do this.
-        this.log.abortCacheFlush();
-        throw new DroppedSnapshotException(e.getMessage());
+      // A.  Flush memcache to all the HStores.
+      // Keep running vector of all store files that includes both old and the
+      // just-made new flush store file.
+      for (HStore hstore: stores.values()) {
+        hstore.flushCache(retval.memcacheSnapshot, retval.sequenceId);
       }
 
-      // If we get to here, the HStores have been written. If we get an
-      // error in completeCacheFlush it will release the lock it is holding
-
       // B.  Write a FLUSHCACHE-COMPLETE message to the log.
       //     This tells future readers that the HStores were emitted correctly,
       //     and that all updates to the log for this regionName that have lower 
       //     log-sequence-ids can be safely ignored.
       this.log.completeCacheFlush(this.regionInfo.regionName,
-          regionInfo.tableDesc.getName(), logCacheFlushId);
-
+        regionInfo.tableDesc.getName(), logCacheFlushId);
+    } catch (IOException e) {
+      // An exception here means that the snapshot was not persisted.
+      // The hlog needs to be replayed so its content is restored to memcache.
+      // Currently, only a server restart will do this.
+      this.log.abortCacheFlush();
+      throw new DroppedSnapshotException(e.getMessage());
     } finally {
       // C. Delete the now-irrelevant memcache snapshot; its contents have been 
       //    dumped to disk-based HStores or, if error, clear aborted snapshot.



Mime
View raw message