hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r919866 - in /hadoop/hbase/branches/0.20: ./ conf/ src/java/org/apache/hadoop/hbase/regionserver/ src/test/org/apache/hadoop/hbase/regionserver/
Date Sat, 06 Mar 2010 22:38:31 GMT
Author: stack
Date: Sat Mar  6 22:38:30 2010
New Revision: 919866

URL: http://svn.apache.org/viewvc?rev=919866&view=rev
Log:
HBASE-2298 Backport of "HLog Group Commit" to 0.20 branch

Removed:
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/LogFlusher.java
Modified:
    hadoop/hbase/branches/0.20/CHANGES.txt
    hadoop/hbase/branches/0.20/conf/hbase-default.xml
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java

Modified: hadoop/hbase/branches/0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/CHANGES.txt?rev=919866&r1=919865&r2=919866&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/CHANGES.txt (original)
+++ hadoop/hbase/branches/0.20/CHANGES.txt Sat Mar  6 22:38:30 2010
@@ -28,6 +28,8 @@
                half-written hfile (Ruslan Salyakhov via Stack)
    HBASE-2288  Shell fails on alter
    HBASE-2293  CME in RegionManager#isMetaServer
+   HBASE-2261  The javadoc in WhileMatchFilter and it's tests in TestFilter are
+               not accurate/wrong
 
   IMPROVEMENTS
    HBASE-2180  Bad read performance from synchronizing hfile.fddatainputstream
@@ -48,6 +50,8 @@
    HBASE-2263  [stargate] multiuser mode: authenticator for zookeeper
    HBASE-2273  [stargate] export metrics via Hadoop metrics, JMX, and zookeeper
    HBASE-2274  [stargate] filter support: JSON descriptors
+   HBASE-2298  Backport of "HLog Group Commit" to 0.20 branch
+               (Nicolas Spiegelberg via Stack)
 
 Release 0.20.3 - January 25th, 2010
   INCOMPATIBLE CHANGES

Modified: hadoop/hbase/branches/0.20/conf/hbase-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/conf/hbase-default.xml?rev=919866&r1=919865&r2=919866&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/conf/hbase-default.xml (original)
+++ hadoop/hbase/branches/0.20/conf/hbase-default.xml Sat Mar  6 22:38:30 2010
@@ -169,16 +169,16 @@
   </property>
   <property>
     <name>hbase.regionserver.flushlogentries</name>
-    <value>100</value>
+    <value>1</value>
     <description>Sync the HLog to the HDFS when it has accumulated this many
-    entries. Default 100. Value is checked on every HLog.sync
+    entries. Default 1. Value is checked on every HLog.sync
     </description>
   </property>
   <property>
     <name>hbase.regionserver.optionallogflushinterval</name>
-    <value>10000</value>
+    <value>1000</value>
     <description>Sync the HLog to the HDFS after this interval if it has not
-    accumulated enough entries to trigger a sync. Default 10 seconds. Units:
+    accumulated enough entries to trigger a sync. Default 1 seconds. Units:
     milliseconds.
     </description>
   </property>

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HLog.java?rev=919866&r1=919865&r2=919866&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HLog.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HLog.java Sat
Mar  6 22:38:30 2010
@@ -33,14 +33,15 @@
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.TreeSet;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -57,9 +58,11 @@
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.SequenceFile.Metadata;
@@ -67,13 +70,15 @@
 import org.apache.hadoop.io.compress.DefaultCodec;
 
 /**
- * HLog stores all the edits to the HStore.
+ * HLog stores all the edits to the HStore. Its the hbase write-ahead-log   
+ * implementation.
  *
  * It performs logfile-rolling, so external callers are not aware that the
  * underlying file is being rolled.
  *
  * <p>
- * A single HLog is used by several HRegions simultaneously.
+ * There is one HLog per RegionServer.  All edits for all Regions carried by
+ * a particular RegionServer are entered first in the HLog.
  *
  * <p>
  * Each HRegion is identified by a unique long <code>int</code>. HRegions do
@@ -103,7 +108,7 @@
 public class HLog implements HConstants, Syncable {
   static final Log LOG = LogFactory.getLog(HLog.class);
   private static final String HLOG_DATFILE = "hlog.dat.";
-  static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY");
+  public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY");
   static final byte [] METAROW = Bytes.toBytes("METAROW");
   private final FileSystem fs;
   private final Path dir;
@@ -113,10 +118,12 @@
   private final long blocksize;
   private final int flushlogentries;
   private final AtomicInteger unflushedEntries = new AtomicInteger(0);
-  private volatile long lastLogFlushTime;
   private final boolean append;
   private final Method syncfs;
   private final static Object [] NO_ARGS = new Object []{};
+  
+  // used to indirectly tell syncFs to force the sync
+  private boolean forceSync = false;
 
   /*
    * Current log file.
@@ -166,6 +173,11 @@
    */
   private final int maxLogs;
 
+  /**
+   * Thread that handles group commit
+   */
+  private final LogSyncer logSyncerThread;
+
   static byte [] COMPLETE_CACHE_FLUSH;
   static {
     try {
@@ -228,15 +240,14 @@
     this.conf = conf;
     this.listener = listener;
     this.flushlogentries =
-      conf.getInt("hbase.regionserver.flushlogentries", 100);
+      conf.getInt("hbase.regionserver.flushlogentries", 1);
     this.blocksize = conf.getLong("hbase.regionserver.hlog.blocksize",
       this.fs.getDefaultBlockSize());
     // Roll at 95% of block size.
     float multi = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f);
     this.logrollsize = (long)(this.blocksize * multi);
     this.optionalFlushInterval =
-      conf.getLong("hbase.regionserver.optionallogflushinterval", 10 * 1000);
-    this.lastLogFlushTime = System.currentTimeMillis();
+      conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000);
     if (fs.exists(dir)) {
       throw new IOException("Target HLog directory already exists: " + dir);
     }
@@ -264,6 +275,10 @@
       }
     }
     this.syncfs = m;
+    
+    logSyncerThread = new LogSyncer(this.optionalFlushInterval);
+    Threads.setDaemonThreadRunning(logSyncerThread,
+        Thread.currentThread().getName() + ".logSyncer");
   }
 
   /**
@@ -282,7 +297,7 @@
     // Compression makes no sense for commit log.  Always return NONE.
     return CompressionType.NONE;
   }
-
+  
   /**
    * Called by HRegionServer when it opens a new region to ensure that log
    * sequence numbers are always greater than the latest sequence number of the
@@ -291,7 +306,7 @@
    * @param newvalue We'll set log edit/sequence number to this value if it
    * is greater than the current value.
    */
-  void setSequenceNumber(final long newvalue) {
+  public void setSequenceNumber(final long newvalue) {
     for (long id = this.logSeqNum.get(); id < newvalue &&
         !this.logSeqNum.compareAndSet(id, newvalue); id = this.logSeqNum.get()) {
       // This could spin on occasion but better the occasional spin than locking
@@ -323,7 +338,7 @@
    *
    * @return If lots of logs, flush the returned regions so next time through
    * we can clean logs. Returns null if nothing to flush.
-   * @throws FailedLogCloseException
+   * @throws org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException
    * @throws IOException
    */
   public byte [][] rollWriter() throws FailedLogCloseException, IOException {
@@ -366,7 +381,6 @@
         }
         this.numEntries.set(0);
         this.editsSize.set(0);
-        updateLock.notifyAll();
       }
     } finally {
       this.cacheFlushLock.unlock();
@@ -377,7 +391,7 @@
   protected SequenceFile.Writer createWriter(Path path) throws IOException {
     return createWriter(path, HLogKey.class, KeyValue.class);
   }
-  
+
   protected SequenceFile.Writer createWriter(Path path,
       Class<? extends HLogKey> keyClass, Class<? extends KeyValue> valueClass)
       throws IOException {
@@ -539,6 +553,14 @@
    * @throws IOException
    */
   public void close() throws IOException {
+    try {
+      logSyncerThread.interrupt();
+      // Make sure we synced everything
+      logSyncerThread.join(this.optionalFlushInterval*2);
+    } catch (InterruptedException e) {
+      LOG.error("Exception while waiting for syncer thread to die", e);
+    }
+
     cacheFlushLock.lock();
     try {
       synchronized (updateLock) {
@@ -547,7 +569,6 @@
           LOG.debug("closing hlog writer in " + this.dir.toString());
         }
         this.writer.close();
-        updateLock.notifyAll();
       }
     } finally {
       cacheFlushLock.unlock();
@@ -603,10 +624,9 @@
       // region being flushed is removed if the sequence number of the flush
       // is greater than or equal to the value in lastSeqWritten.
       this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum));
-      boolean sync = regionInfo.isMetaRegion() || regionInfo.isRootRegion();
-      doWrite(logKey, logEdit, sync);
+      doWrite(logKey, logEdit);
+      this.unflushedEntries.incrementAndGet();
       this.numEntries.incrementAndGet();
-      updateLock.notifyAll();
     }
     if (this.editsSize.get() > this.logrollsize) {
       if (listener != null) {
@@ -640,7 +660,7 @@
    * @throws IOException
    */
   public void append(byte [] regionName, byte [] tableName, List<KeyValue> edits,
-    boolean sync, final long now)
+    final long now)
   throws IOException {
     if (this.closed) {
       throw new IOException("Cannot append; log is closed");
@@ -656,50 +676,146 @@
       int counter = 0;
       for (KeyValue kv: edits) {
         HLogKey logKey = makeKey(regionName, tableName, seqNum[counter++], now);
-        doWrite(logKey, kv, sync);
+        doWrite(logKey, kv);
         this.numEntries.incrementAndGet();
       }
-      updateLock.notifyAll();
+      // Only count 1 row as an unflushed entry
+      this.unflushedEntries.incrementAndGet();
     }
     if (this.editsSize.get() > this.logrollsize) {
         requestLogRoll();
     }
   }
 
-  public void sync() throws IOException {
-    lastLogFlushTime = System.currentTimeMillis();
-    if (this.append && syncfs != null) {
-      try {
-        this.syncfs.invoke(this.writer, NO_ARGS);
-      } catch (Exception e) {
-        throw new IOException("Reflection", e);
-      }
-    } else {
-      this.writer.sync();
+  /**
+    * This thread is responsible to call syncFs and buffer up the writers while
+    * it happens.
+    */
+   class LogSyncer extends Thread {
+ 
+     // Using fairness to make sure locks are given in order
+     private final ReentrantLock lock = new ReentrantLock(true);
+
+     // Condition used to wait until we have something to sync
+     private final Condition queueEmpty = lock.newCondition();
+ 
+    // Condition used to signal that the sync is done
+    private final Condition syncDone = lock.newCondition();
+
+    private final long optionalFlushInterval;
+
+    LogSyncer(long optionalFlushInterval) {
+      this.optionalFlushInterval = optionalFlushInterval;
     }
-    this.unflushedEntries.set(0);
-    syncTime += System.currentTimeMillis() - lastLogFlushTime;
-    syncOps++;
+
+    @Override
+    public void run() {
+      try {
+        lock.lock();
+         // awaiting with a timeout doesn't always
+         // throw exceptions on interrupt
+         while(!this.isInterrupted()) {
+ 
+           // Wait until something has to be hflushed or do it if we waited
+           // enough time (useful if something appends but does not hflush).
+           // 0 or less means that it timed out and maybe waited a bit more.
+           if (!(queueEmpty.awaitNanos(
+                   this.optionalFlushInterval*1000000) <= 0)) {
+             forceSync = true;
+           }
+ 
+           // We got the signal, let's hflush. We currently own the lock so new
+           // writes are waiting to acquire it in addToSyncQueue while the ones
+           // we hflush are waiting on await()
+           hflush();
+ 
+           // Release all the clients waiting on the hflush. Notice that we still
+           // own the lock until we get back to await at which point all the
+           // other threads waiting will first acquire and release locks
+           syncDone.signalAll();
+         }
+       } catch (IOException e) {
+         LOG.error("Error while syncing, requesting close of hlog ", e);
+         requestLogRoll();
+       } catch (InterruptedException e) {
+         LOG.debug(getName() + "interrupted while waiting for sync requests");
+       } finally {
+         syncDone.signalAll();
+         lock.unlock();
+         LOG.info(getName() + " exiting");
+       }
+     }
+ 
+     /**
+      * This method first signals the thread that there's a sync needed
+      * and then waits for it to happen before returning.
+      */
+     public void addToSyncQueue(boolean force) {
+ 
+       // Don't bother if somehow our append was already hflush
+       if (unflushedEntries.get() == 0) {
+         return;
+       }
+       lock.lock();
+       try {
+         if(force) {
+           forceSync = true;
+         }
+         // Wake the thread
+         queueEmpty.signal();
+ 
+         // Wait for it to hflush
+         syncDone.await();
+       } catch (InterruptedException e) {
+         LOG.debug(getName() + " was interrupted while waiting for sync", e);
+       }
+       finally {
+         lock.unlock();
+       }
+    }
+  }
+
+  public void sync() {
+    sync(false);
+  }
+
+  /**
+   * This method calls the LogSyncer in order to group commit the sync
+   * with other threads.
+   * @param force For catalog regions, force the sync to happen
+   */
+  public void sync(boolean force) {
+    logSyncerThread.addToSyncQueue(force);
   }
+  
+  protected void hflush() throws IOException {
+    synchronized (this.updateLock) {
+      if (this.closed) {
+        return;
+      }
 
-  void optionalSync() {
-    if (!this.closed) {
-      long now = System.currentTimeMillis();
-      synchronized (updateLock) {
-        if (((now - this.optionalFlushInterval) > this.lastLogFlushTime) &&
-            this.unflushedEntries.get() > 0) {
-          try {
-            sync();
-          } catch (IOException e) {
-            LOG.error("Error flushing hlog", e);
+      if (this.forceSync ||
+         this.unflushedEntries.get() >= this.flushlogentries) {
+        try {
+          long now = System.currentTimeMillis();
+          this.writer.sync();
+          if (this.append && syncfs != null) {
+            try {
+             this.syncfs.invoke(this.writer, NO_ARGS); 
+            } catch (Exception e) {
+              throw new IOException("Reflection", e);
+            }
           }
+          syncTime += System.currentTimeMillis() - now;
+          syncOps++;
+          this.forceSync = false;
+          this.unflushedEntries.set(0);
+        } catch (IOException e) {
+          LOG.fatal("Could not append. Requesting close of hlog", e);
+          requestLogRoll();
+          throw e;
         }
       }
-      long took = System.currentTimeMillis() - now;
-      if (took > 1000) {
-        LOG.warn(Thread.currentThread().getName() + " took " + took +
-          "ms optional sync'ing hlog; editcount=" + this.numEntries.get());
-      }
     }
   }
 
@@ -709,7 +825,7 @@
     }
   }
   
-  private void doWrite(HLogKey logKey, KeyValue logEdit, boolean sync)
+  private void doWrite(HLogKey logKey, KeyValue logEdit)
   throws IOException {
     if (!this.enabled) {
       return;
@@ -725,9 +841,6 @@
         LOG.warn(Thread.currentThread().getName() + " took " + took +
           "ms appending an edit to hlog; editcount=" + this.numEntries.get());
       }
-      if (sync || this.unflushedEntries.incrementAndGet() >= flushlogentries) {
-        sync();
-      }
     } catch (IOException e) {
       LOG.fatal("Could not append. Requesting close of hlog", e);
       requestLogRoll();
@@ -778,7 +891,7 @@
    * @see #completeCacheFlush(Text, Text, long)
    * @see #abortCacheFlush()
    */
-  long startCacheFlush() {
+  public long startCacheFlush() {
     this.cacheFlushLock.lock();
     return obtainSeqNum();
   }
@@ -793,7 +906,7 @@
    * @param logSeqId
    * @throws IOException
    */
-  void completeCacheFlush(final byte [] regionName, final byte [] tableName,
+  public void completeCacheFlush(final byte [] regionName, final byte [] tableName,
     final long logSeqId)
   throws IOException {
     try {
@@ -811,7 +924,6 @@
         if (seq != null && logSeqId >= seq.longValue()) {
           this.lastSeqWritten.remove(regionName);
         }
-        updateLock.notifyAll();
       }
     } finally {
       this.cacheFlushLock.unlock();
@@ -829,7 +941,7 @@
    * currently is a restart of the regionserver so the snapshot content dropped
    * by the failure gets restored to the memstore.
    */
-  void abortCacheFlush() {
+  public void abortCacheFlush() {
     this.cacheFlushLock.unlock();
   }
 
@@ -894,12 +1006,13 @@
     }
   }
   
-   static Class<? extends HLogKey> getKeyClass(HBaseConfiguration conf) {
-     return (Class<? extends HLogKey>) conf
-        .getClass("hbase.regionserver.hlog.keyclass", HLogKey.class);
+  @SuppressWarnings("unchecked")
+  public static Class<? extends HLogKey> getKeyClass(HBaseConfiguration conf) {
+    return (Class<? extends HLogKey>) 
+       conf.getClass("hbase.regionserver.hlog.keyclass", HLogKey.class);
   }
   
-   static HLogKey newKey(HBaseConfiguration conf) throws IOException {
+  public static HLogKey newKey(HBaseConfiguration conf) throws IOException {
     Class<? extends HLogKey> keyClass = getKeyClass(conf);
     try {
       return keyClass.newInstance();
@@ -1038,18 +1151,11 @@
                     Path oldlogfile = null;
                     SequenceFile.Reader old = null;
                     if (fs.exists(logfile)) {
-                      FileStatus stat = fs.getFileStatus(logfile);
-                      if (stat.getLen() <= 0) {
-                        LOG.warn("Old hlog file " + logfile + " is zero " +
-                          "length. Deleting existing file");
-                        fs.delete(logfile, false);
-                      } else {
-                        LOG.warn("Old hlog file " + logfile + " already " +
-                          "exists. Copying existing file to new file");
-                        oldlogfile = new Path(logfile.toString() + ".old");
-                        fs.rename(logfile, oldlogfile);
-                        old = new SequenceFile.Reader(fs, oldlogfile, conf);
-                      }
+                      LOG.warn("Old hlog file " + logfile 
+                        + " already exists. Copying existing file to new file");
+                      oldlogfile = new Path(logfile.toString() + ".old");
+                      fs.rename(logfile, oldlogfile);
+                      old = new SequenceFile.Reader(fs, oldlogfile, conf);
                     }
                     SequenceFile.Writer w =
                       SequenceFile.createWriter(fs, conf, logfile,
@@ -1115,24 +1221,24 @@
     return splits;
   }
 
-  /**
-   * @param conf
-   * @return True if append enabled and we have the syncFs in our path.
-   */
+ /*  
+  * @param conf
+  * @return True if append enabled and we have the syncFs in our path.
+  */
   private static boolean isAppend(final HBaseConfiguration conf) {
-      boolean append = conf.getBoolean("dfs.support.append", false);
-      if (append) {
-        try {
-          SequenceFile.Writer.class.getMethod("syncFs", new Class<?> []{});
-          append = true;
-        } catch (SecurityException e) {
-        } catch (NoSuchMethodException e) {
-          append = false;
-        }
+    boolean append = conf.getBoolean("dfs.support.append", false);
+    if (append) {
+      try {
+        SequenceFile.Writer.class.getMethod("syncFs", new Class<?> []{});
+        append = true;
+      } catch (SecurityException e) {
+      } catch (NoSuchMethodException e) {
+        append = false;
       }
-      return append;
     }
-
+    return append;
+  }
+  
   /**
    * Utility class that lets us keep track of the edit with it's key
    * Only used when splitting logs
@@ -1165,24 +1271,15 @@
       return key;
     }
 
+    @Override
     public String toString() {
       return this.key + "=" + this.edit;
     }
   }
 
-  /**
-   * Construct the HLog directory name
-   * 
-   * @param info HServerInfo for server
-   * @return the HLog directory name
-   */
-  public static String getHLogDirectoryName(HServerInfo info) {
-    return getHLogDirectoryName(HServerInfo.getServerName(info));
-  }
-
   /*
    * Recover log.
-   * If append has been set, try and open log in append mode.
+   * Try and open log in append mode.
    * Doing this, we get a hold of the file that crashed writer
    * was writing to.  Once we have it, close it.  This will
    * allow subsequent reader to see up to last sync.
@@ -1190,7 +1287,7 @@
    * @param p
    * @param append
    */
-  private static void recoverLog(final FileSystem fs, final Path p,
+  public static void recoverLog(final FileSystem fs, final Path p,
       final boolean append) {
     if (!append) {
       return;
@@ -1217,6 +1314,16 @@
   /**
    * Construct the HLog directory name
    * 
+   * @param info HServerInfo for server
+   * @return the HLog directory name
+   */
+  public static String getHLogDirectoryName(HServerInfo info) {
+    return getHLogDirectoryName(HServerInfo.getServerName(info));
+  }
+
+  /**
+   * Construct the HLog directory name
+   * 
    * @param serverAddress
    * @param startCode
    * @return the HLog directory name

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=919866&r1=919865&r2=919866&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
(original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
Sat Mar  6 22:38:30 2010
@@ -1205,8 +1205,7 @@
     try {
       if (writeToWAL) {
         this.log.append(regionInfo.getRegionName(),
-          regionInfo.getTableDesc().getName(), kvs,
-          (regionInfo.isMetaRegion() || regionInfo.isRootRegion()), now);
+          regionInfo.getTableDesc().getName(), kvs, now);
       }
       long size = 0;
       Store store = getStore(family);
@@ -1511,8 +1510,7 @@
       if (writeToWAL) {
         long now = System.currentTimeMillis();
         this.log.append(regionInfo.getRegionName(),
-          regionInfo.getTableDesc().getName(), edits,
-          (regionInfo.isMetaRegion() || regionInfo.isRootRegion()), now);
+          regionInfo.getTableDesc().getName(), edits, now);
       }
       long size = 0;
       Store store = getStore(family);
@@ -2461,8 +2459,7 @@
         List<KeyValue> edits = new ArrayList<KeyValue>(1);
         edits.add(newKv);
         this.log.append(regionInfo.getRegionName(),
-          regionInfo.getTableDesc().getName(), edits,
-          (regionInfo.isMetaRegion() || regionInfo.isRootRegion()), now);
+          regionInfo.getTableDesc().getName(), edits, now);
       }
 
       // Now request the ICV to the store, this will set the timestamp

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=919866&r1=919865&r2=919866&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
(original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Sat Mar  6 22:38:30 2010
@@ -208,7 +208,6 @@
   // eclipse warning when accessed by inner classes
   protected volatile HLog hlog;
   LogRoller hlogRoller;
-  LogFlusher hlogFlusher;
   
   // flag set after we're done setting up server threads (used for testing)
   protected volatile boolean isOnline;
@@ -253,7 +252,7 @@
     this.fsOk = true;
     this.conf = conf;
     this.connection = ServerConnectionManager.getConnection(conf);
-
+    
     this.isOnline = false;
     
     // Config'ed params
@@ -331,10 +330,6 @@
     // Log rolling thread
     this.hlogRoller = new LogRoller(this);
     
-    // Log flushing thread
-    this.hlogFlusher =
-      new LogFlusher(this.threadWakeFrequency, this.stopRequested);
-    
     // Background thread to check for major compactions; needed if region
     // has not gotten updates in a while.  Make it run at a lesser frequency.
     int multiplier = this.conf.getInt(THREAD_WAKE_FREQUENCY +
@@ -522,7 +517,6 @@
                   try {
                     serverInfo.setStartCode(System.currentTimeMillis());
                     hlog = setupHLog();
-                    this.hlogFlusher.setHLog(hlog);
                   } catch (IOException e) {
                     this.abortRequested = true;
                     this.stopRequested.set(true);
@@ -624,7 +618,6 @@
     // Send interrupts to wake up threads if sleeping so they notice shutdown.
     // TODO: Should we check they are alive?  If OOME could have exited already
     cacheFlusher.interruptIfNecessary();
-    hlogFlusher.interrupt();
     compactSplitThread.interruptIfNecessary();
     hlogRoller.interruptIfNecessary();
     this.majorCompactionChecker.interrupt();
@@ -792,7 +785,6 @@
 
       this.rootDir = new Path(this.conf.get(HConstants.HBASE_DIR));
       this.hlog = setupHLog();
-      this.hlogFlusher.setHLog(hlog);
       // Init in here rather than in constructor after thread name has been set
       this.metrics = new RegionServerMetrics();
       startServiceThreads();
@@ -1149,8 +1141,6 @@
     };
     Threads.setDaemonThreadRunning(this.hlogRoller, n + ".logRoller",
         handler);
-    Threads.setDaemonThreadRunning(this.hlogFlusher, n + ".logFlusher",
-        handler);
     Threads.setDaemonThreadRunning(this.cacheFlusher, n + ".cacheFlusher",
       handler);
     Threads.setDaemonThreadRunning(this.compactSplitThread, n + ".compactor",
@@ -1772,6 +1762,8 @@
         this.cacheFlusher.reclaimMemStoreMemory();
       }
       region.put(put, getLockFromId(put.getLockId()));
+      
+      this.hlog.sync(region.getRegionInfo().isMetaRegion());
     } catch (Throwable t) {
       throw convertThrowableToIOE(cleanup(t));
     }
@@ -1782,8 +1774,11 @@
     // Count of Puts processed.
     int i = 0;
     checkOpen();
+    boolean isMetaRegion = false;
     try {
       HRegion region = getRegion(regionName);
+      isMetaRegion = region.getRegionInfo().isMetaRegion();
+      
       if (!region.getRegionInfo().isMetaTable()) {
         this.cacheFlusher.reclaimMemStoreMemory();
       }
@@ -1802,7 +1797,13 @@
       throw convertThrowableToIOE(cleanup(t));
     }
     // All have been processed successfully.
-    return -1;
+    this.hlog.sync(isMetaRegion);
+    
+    if (i == puts.length) {
+      return -1;
+    } else {
+      return i;
+    }
   }
 
   /**
@@ -1830,8 +1831,11 @@
       if (!region.getRegionInfo().isMetaTable()) {
         this.cacheFlusher.reclaimMemStoreMemory();
       }
-      return region.checkAndPut(row, family, qualifier, value, put,
+      boolean retval = region.checkAndPut(row, family, qualifier, value, put,
           getLockFromId(put.getLockId()), true);
+
+      this.hlog.sync(region.getRegionInfo().isMetaRegion());
+      return retval;
     } catch (Throwable t) {
       throw convertThrowableToIOE(cleanup(t));
     }
@@ -1992,6 +1996,7 @@
       }
       Integer lid = getLockFromId(delete.getLockId());
       region.delete(delete, lid, writeToWAL);
+      this.hlog.sync(region.getRegionInfo().isMetaRegion());
     } catch (Throwable t) {
       throw convertThrowableToIOE(cleanup(t));
     }
@@ -2002,9 +2007,11 @@
     // Count of Deletes processed.
     int i = 0;
     checkOpen();
+    boolean isMetaRegion = false;
     try {
       boolean writeToWAL = true;
       HRegion region = getRegion(regionName);
+      isMetaRegion = region.getRegionInfo().isMetaRegion();
       if (!region.getRegionInfo().isMetaTable()) {
         this.cacheFlusher.reclaimMemStoreMemory();
       }
@@ -2022,6 +2029,8 @@
     } catch (Throwable t) {
       throw convertThrowableToIOE(cleanup(t));
     }
+    
+    this.hlog.sync(isMetaRegion);
     // All have been processed successfully.
     return -1;
   }
@@ -2477,8 +2486,11 @@
     requestCount.incrementAndGet();
     try {
       HRegion region = getRegion(regionName);
-      return region.incrementColumnValue(row, family, qualifier, amount, 
+      long retval = region.incrementColumnValue(row, family, qualifier, amount, 
           writeToWAL);
+      
+      this.hlog.sync(region.getRegionInfo().isMetaRegion());
+      return retval;
     } catch (IOException e) {
       checkFileSystem();
       throw e;

Modified: hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java?rev=919866&r1=919865&r2=919866&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java
(original)
+++ hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java
Sat Mar  6 22:38:30 2010
@@ -107,8 +107,7 @@
             edit.add(new KeyValue(rowName, column, System.currentTimeMillis(),
               column));
             System.out.println("Region " + i + ": " + edit);
-            log.append(Bytes.toBytes("" + i), tableName, edit,
-              false, System.currentTimeMillis());
+            log.append(Bytes.toBytes("" + i), tableName, edit, System.currentTimeMillis());
           }
         }
         log.rollWriter();
@@ -174,7 +173,7 @@
         cols.add(new KeyValue(row, Bytes.toBytes("column:" + Integer.toString(i)),
           timestamp, new byte[] { (byte)(i + '0') }));
       }
-      log.append(regionName, tableName, cols, false, System.currentTimeMillis());
+      log.append(regionName, tableName, cols, System.currentTimeMillis());
       long logSeqId = log.startCacheFlush();
       log.completeCacheFlush(regionName, tableName, logSeqId);
       log.close();
@@ -211,4 +210,4 @@
       }
     }
   }
-}
\ No newline at end of file
+}



Mime
View raw message