hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r1159497 - in /hbase/trunk: CHANGES.txt src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
Date Fri, 19 Aug 2011 03:57:49 GMT
Author: stack
Date: Fri Aug 19 03:57:48 2011
New Revision: 1159497

URL: http://svn.apache.org/viewvc?rev=1159497&view=rev
Log:
HBASE-4095 Hlog may not be rolled in a long time if checkLowReplication's request of LogRoll
is blocked

Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1159497&r1=1159496&r2=1159497&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Fri Aug 19 03:57:48 2011
@@ -442,10 +442,14 @@ Release 0.90.5 - Unreleased
                regionserver (Anirudh Todi)
    HBASE-4196  TableRecordReader may skip first row of region (Ming Ma)
    HBASE-4170  createTable java doc needs to be improved (Mubarak Seyed)
-   HBASE-4144  RS does not abort if the initialization of RS fails (ramkrishna.s.vasudevan)
-   HBASE-4148  HFileOutputFormat doesn't fill in TIMERANGE_KEY metadata (Jonathan Hsieh)
+   HBASE-4144  RS does not abort if the initialization of RS fails
+               (ramkrishna.s.vasudevan)
+   HBASE-4148  HFileOutputFormat doesn't fill in TIMERANGE_KEY metadata
+               (Jonathan Hsieh)
    HBASE-4159  HBaseServer - IPC Reader threads are not daemons (Douglas
                Campbell)
+   HBASE-4095  Hlog may not be rolled in a long time if checkLowReplication's
+               request of LogRoll is blocked (Jieshan Bean)
 
   IMPROVEMENT
    HBASE-4205  Enhance HTable javadoc (Eric Charles)

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1159497&r1=1159496&r2=1159497&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Fri Aug 19
03:57:48 2011
@@ -29,11 +29,11 @@ import java.lang.reflect.InvocationTarge
 import java.lang.reflect.Method;
 import java.net.URLEncoder;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
-import java.util.Arrays;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.TreeSet;
@@ -55,7 +55,12 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.Syncable;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.FSUtils;
@@ -125,8 +130,7 @@ public class HLog implements Syncable {
   private final long blocksize;
   private final String prefix;
   private final Path oldLogDir;
-  private boolean logRollRequested;
-
+  private boolean logRollRunning;
 
   private static Class<? extends Writer> logWriterClass;
   private static Class<? extends Reader> logReaderClass;
@@ -138,7 +142,9 @@ public class HLog implements Syncable {
   }
 
   private FSDataOutputStream hdfs_out; // FSDataOutputStream associated with the current
SequenceFile.writer
-  private int initialReplication; // initial replication factor of SequenceFile.writer
+  // Minimum tolerable replicas, if the actual value is lower than it, 
+  // rollWriter will be triggered
+  private int minTolerableReplication;
   private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas
   final static Object [] NO_ARGS = new Object []{};
 
@@ -186,6 +192,17 @@ public class HLog implements Syncable {
 
   //number of transactions in the current Hlog.
   private final AtomicInteger numEntries = new AtomicInteger(0);
+  // If live datanode count is lower than the default replicas value,
+  // RollWriter will be triggered in each sync(So the RollWriter will be
+  // triggered one by one in a short time). Using it as a workaround to slow
+  // down the roll frequency triggered by checkLowReplication().
+  private volatile int consecutiveLogRolls = 0;
+  private final int lowReplicationRollLimit;
+
+  // If consecutiveLogRolls is larger than lowReplicationRollLimit,
+  // then disable the rolling in checkLowReplication().
+  // Enable it if the replications recover.
+  private volatile boolean lowReplicationRollEnabled = true;
 
   // If > than this size, roll the log. This is typically 0.95 times the size
   // of the default Hdfs block size.
@@ -353,6 +370,11 @@ public class HLog implements Syncable {
       }
     }
     this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32);
+    this.minTolerableReplication = conf.getInt(
+        "hbase.regionserver.hlog.tolerable.lowreplication",
+        this.fs.getDefaultReplication());
+    this.lowReplicationRollLimit = conf.getInt(
+        "hbase.regionserver.hlog.lowreplication.rolllimit", 5);
     this.enabled = conf.getBoolean("hbase.regionserver.hlog.enabled", true);
     LOG.info("HLog configuration: blocksize=" +
       StringUtils.byteDesc(this.blocksize) +
@@ -481,6 +503,7 @@ public class HLog implements Syncable {
     }
     byte [][] regionsToFlush = null;
     this.cacheFlushLock.lock();
+    this.logRollRunning = true;
     try {
       if (closed) {
         return regionsToFlush;
@@ -491,7 +514,6 @@ public class HLog implements Syncable {
       this.filenum = System.currentTimeMillis();
       Path newPath = computeFilename();
       HLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf);
-      int nextInitialReplication = fs.getFileStatus(newPath).getReplication();
       // Can we get at the dfsclient outputstream?  If an instance of
       // SFLW, it'll have done the necessary reflection to get at the
       // protected field name.
@@ -510,7 +532,6 @@ public class HLog implements Syncable {
         // Clean up current writer.
         Path oldFile = cleanupCurrentWriter(currentFilenum);
         this.writer = nextWriter;
-        this.initialReplication = nextInitialReplication;
         this.hdfs_out = nextHdfsOut;
 
         LOG.info((oldFile != null?
@@ -520,7 +541,6 @@ public class HLog implements Syncable {
             this.fs.getFileStatus(oldFile).getLen() + ". ": "") +
           "New hlog " + FSUtils.getPath(newPath));
         this.numEntries.set(0);
-        this.logRollRequested = false;
       }
       // Can we delete any of the old log files?
       if (this.outputfiles.size() > 0) {
@@ -538,6 +558,7 @@ public class HLog implements Syncable {
         }
       }
     } finally {
+      this.logRollRunning = false;
       this.cacheFlushLock.unlock();
     }
     return regionsToFlush;
@@ -977,7 +998,7 @@ public class HLog implements Syncable {
       synchronized (this.updateLock) {
         syncTime += System.currentTimeMillis() - now;
         syncOps++;
-        if (!logRollRequested) {
+        if (!this.logRollRunning) {
           checkLowReplication();
           if (this.writer.getLength() > this.logrollsize) {
             requestLogRoll();
@@ -993,18 +1014,44 @@ public class HLog implements Syncable {
   }
 
   private void checkLowReplication() {
-    // if the number of replicas in HDFS has fallen below the initial
+    // if the number of replicas in HDFS has fallen below the configured
     // value, then roll logs.
     try {
       int numCurrentReplicas = getLogReplication();
-      if (numCurrentReplicas != 0 &&
-          numCurrentReplicas < this.initialReplication) {
-        LOG.warn("HDFS pipeline error detected. " +
-            "Found " + numCurrentReplicas + " replicas but expecting " +
-            this.initialReplication + " replicas. " +
-            " Requesting close of hlog.");
-        requestLogRoll();
-        logRollRequested = true;
+      if (numCurrentReplicas != 0
+          && numCurrentReplicas < this.minTolerableReplication) {
+        if (this.lowReplicationRollEnabled) {
+          if (this.consecutiveLogRolls < this.lowReplicationRollLimit) {
+            LOG.warn("HDFS pipeline error detected. " + "Found "
+                + numCurrentReplicas + " replicas but expecting no less than "
+                + this.minTolerableReplication + " replicas. "
+                + " Requesting close of hlog.");
+            requestLogRoll();
+            // If rollWriter is requested, increase consecutiveLogRolls. Once it
+            // is larger than lowReplicationRollLimit, disable the
+            // LowReplication-Roller
+            this.consecutiveLogRolls++;
+          } else {
+            LOG.warn("Too many consecutive RollWriter requests, it's a sign of "
+                + "the total number of live datanodes is lower than the tolerable replicas.");
+            this.consecutiveLogRolls = 0;
+            this.lowReplicationRollEnabled = false;
+          }
+        }
+      } else if (numCurrentReplicas >= this.minTolerableReplication) {
+
+        if (!this.lowReplicationRollEnabled) {
+          // The new writer's log replicas is always the default value.
+          // So we should not enable LowReplication-Roller. If numEntries
+          // is lower than or equals 1, we consider it as a new writer.
+          if (this.numEntries.get() <= 1) {
+            return;
+          }
+          // Once the live datanode number and the replicas return to normal,
+          // enable the LowReplication-Roller.
+          this.lowReplicationRollEnabled = true;
+          LOG.info("LowReplication-Roller was enabled.");
+        }
       }
     } catch (Exception e) {
       LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e +
@@ -1262,6 +1309,15 @@ public class HLog implements Syncable {
     return Bytes.equals(METAFAMILY, family);
   }
 
+  /**
+   * Get LowReplication-Roller status
+   * 
+   * @return lowReplicationRollEnabled
+   */
+  public boolean isLowReplicationRollEnabled() {
+    return lowReplicationRollEnabled;
+  }
+
   @SuppressWarnings("unchecked")
   public static Class<? extends HLogKey> getKeyClass(Configuration conf) {
      return (Class<? extends HLogKey>)

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java?rev=1159497&r1=1159496&r2=1159497&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
(original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
Fri Aug 19 03:57:48 2011
@@ -19,6 +19,8 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
+import static org.junit.Assert.assertTrue;
+
 import java.io.IOException;
 import java.io.OutputStream;
 import java.lang.reflect.InvocationTargetException;
@@ -29,23 +31,21 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
-
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@@ -55,8 +55,6 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import static org.junit.Assert.assertTrue;
-
 /**
  * Test log deletion as logs are rolled.
  */
@@ -137,6 +135,10 @@ public class TestLogRolling  {
    // the namenode might still try to choose the recently-dead datanode
    // for a pipeline, so try to a new pipeline multiple times
     TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30);
+    TEST_UTIL.getConfiguration().setInt(
+        "hbase.regionserver.hlog.tolerable.lowreplication", 2);
+    TEST_UTIL.getConfiguration().setInt(
+        "hbase.regionserver.hlog.lowreplication.rolllimit", 3);
     TEST_UTIL.startMiniCluster(2);
 
     cluster = TEST_UTIL.getHBaseCluster();
@@ -225,6 +227,30 @@ public class TestLogRolling  {
     }
   }
 
+  void batchWriteAndWait(HTable table, int start, boolean expect, int timeout)
+      throws IOException {
+    for (int i = 0; i < 10; i++) {
+      Put put = new Put(Bytes.toBytes("row"
+          + String.format("%1$04d", (start + i))));
+      put.add(HConstants.CATALOG_FAMILY, null, value);
+      table.put(put);
+    }
+    long startTime = System.currentTimeMillis();
+    long remaining = timeout;
+    while (remaining > 0) {
+      if (log.isLowReplicationRollEnabled() == expect) {
+        break;
+      } else {
+        try {
+          Thread.sleep(200);
+        } catch (InterruptedException e) {
+          // continue
+        }
+        remaining = timeout - (System.currentTimeMillis() - startTime);
+      }
+    }
+  }
+  
   /**
    * Give me the HDFS pipeline for this log file
    */
@@ -267,7 +293,7 @@ public class TestLogRolling  {
 
     this.server = cluster.getRegionServer(0);
     this.log = server.getWAL();
-    
+
     // Create the test table and open it
     String tableName = getName();
     HTableDescriptor desc = new HTableDescriptor(tableName);
@@ -320,7 +346,23 @@ public class TestLogRolling  {
     // write some more log data (this should use a new hdfs_out)
     writeData(table, 3);
     assertTrue("The log should not roll again.", log.getFilenum() == newFilenum);
-    assertTrue("New log file should have the default replication", log
-        .getLogReplication() == fs.getDefaultReplication());
+    // kill another datanode in the pipeline, so the replicas will be lower than
+    // the configured value 2.
+    assertTrue(dfsCluster.stopDataNode(pipeline[1].getName()) != null);
+    Thread.sleep(10000);
+    batchWriteAndWait(table, 3, false, 10000);
+    assertTrue("LowReplication Roller should've been disabled",
+        !log.isLowReplicationRollEnabled());
+    dfsCluster
+        .startDataNodes(TEST_UTIL.getConfiguration(), 1, true, null, null);
+    dfsCluster.waitActive();
+    // Force roll writer. The new log file will have the default replications,
+    // and the LowReplication Roller will be enabled.
+    log.rollWriter();
+    batchWriteAndWait(table, 13, true, 10000);
+    assertTrue("LowReplication Roller should've been enabled",
+        log.isLowReplicationRollEnabled());
+    assertTrue("New log file should have the default replication",
+        log.getLogReplication() == fs.getDefaultReplication());
   }
 }



Mime
View raw message