hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r921395 - in /hadoop/hbase/branches/0.20: CHANGES.txt src/java/org/apache/hadoop/hbase/regionserver/HLog.java src/test/org/apache/hadoop/hbase/regionserver/TestLogRolling.java
Date Wed, 10 Mar 2010 15:20:36 GMT
Author: stack
Date: Wed Mar 10 15:20:36 2010
New Revision: 921395

URL: http://svn.apache.org/viewvc?rev=921395&view=rev
Log:
HBASE-2234 Roll Hlog if any datanode in the write pipeline dies

Modified:
    hadoop/hbase/branches/0.20/CHANGES.txt
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
    hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestLogRolling.java

Modified: hadoop/hbase/branches/0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/CHANGES.txt?rev=921395&r1=921394&r2=921395&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/CHANGES.txt (original)
+++ hadoop/hbase/branches/0.20/CHANGES.txt Wed Mar 10 15:20:36 2010
@@ -52,6 +52,8 @@ Release 0.20.4 - Unreleased
    HBASE-2249  The PerformanceEvaluation read tests don't take the MemStore
                into account
    HBASE-2277  Update branch to hadoop 0.20.2
+   HBASE-2234  Roll Hlog if any datanode in the write pipeline dies
+               (Nicolas Spiegelberg via Stack)
 
   NEW FEATURES
    HBASE-2257  [stargate] multiuser mode

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HLog.java?rev=921395&r1=921394&r2=921395&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HLog.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HLog.java Wed
Mar 10 15:20:36 2010
@@ -22,7 +22,9 @@ package org.apache.hadoop.hbase.regionse
 import java.io.EOFException;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -118,8 +120,9 @@ public class HLog implements HConstants,
   private final long blocksize;
   private final int flushlogentries;
   private final AtomicInteger unflushedEntries = new AtomicInteger(0);
-  private final boolean append;
-  private final Method syncfs;
+  private final Method syncfs;       // refers to SequenceFileWriter.syncFs()
+  private OutputStream hdfs_out;     // OutputStream associated with the current SequenceFile.writer
+  private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas
   private final static Object [] NO_ARGS = new Object []{};
   
   // used to indirectly tell syncFs to force the sync
@@ -138,6 +141,7 @@ public class HLog implements HConstants,
 
   /*
    * Map of regions to first sequence/edit id in their memstore.
+   * The sequenceid is the id of the last write into the current HLog.
    */
   private final ConcurrentSkipListMap<byte [], Long> lastSeqWritten =
     new ConcurrentSkipListMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
@@ -146,14 +150,17 @@ public class HLog implements HConstants,
 
   private final AtomicLong logSeqNum = new AtomicLong(0);
 
+  // The timestamp (in ms) when the log file was created.
   private volatile long filenum = -1;
   
+  //number of transactions in the current Hlog.
   private final AtomicInteger numEntries = new AtomicInteger(0);
 
-  // Size of edits written so far. Used figuring when to rotate logs.
+  // Edit size of the current log. Used in figuring when to rotate logs.
   private final AtomicLong editsSize = new AtomicLong(0);
 
-  // If > than this size, roll the log.
+  // If > than this size, roll the log. This is typically 0.95 times the size 
+  // of the default Hdfs block size.
   private final long logrollsize;
 
   // This lock prevents starting a log roll during a cache flush.
@@ -259,14 +266,37 @@ public class HLog implements HConstants,
       ", enabled=" + this.enabled +
       ", flushlogentries=" + this.flushlogentries +
       ", optionallogflushinternal=" + this.optionalFlushInterval + "ms");
+
     rollWriter();
+
+    // handle the reflection necessary to call getNumCurrentReplicas()
+    this.getNumCurrentReplicas = null;
+    if(this.hdfs_out != null) {
+      try {
+        this.getNumCurrentReplicas = 
+          this.hdfs_out.getClass().getMethod("getNumCurrentReplicas", 
+                                             new Class<?> []{});
+        this.getNumCurrentReplicas.setAccessible(true);
+      } catch (NoSuchMethodException e) {
+        // Thrown if getNumCurrentReplicas() function isn't available
+      } catch (SecurityException e) {
+        // Thrown if we can't get access to getNumCurrentReplicas()
+        this.getNumCurrentReplicas = null; // could happen on setAccessible()
+      }
+    }
+    if(this.getNumCurrentReplicas != null) {
+      LOG.info("Using getNumCurrentReplicas--HDFS-826");
+    } else {
+      LOG.info("getNumCurrentReplicas--HDFS-826 not available" );
+    }
+    
     // Test if syncfs is available.
-    this.append = isAppend(conf);
     Method m = null;
-    if (this.append) {
+    if (isAppend(conf)) {
       try {
+        // function pointer to writer.syncFs()
         m = this.writer.getClass().getMethod("syncFs", new Class<?> []{});
-        LOG.debug("Using syncFs--hadoop-4379");
+        LOG.info("Using syncFs--hadoop-4379");
       } catch (SecurityException e) {
         throw new IOException("Failed test for syncfs", e);
       } catch (NoSuchMethodException e) {
@@ -275,7 +305,7 @@ public class HLog implements HConstants,
       }
     }
     this.syncfs = m;
-    
+
     logSyncerThread = new LogSyncer(this.optionalFlushInterval);
     Threads.setDaemonThreadRunning(logSyncerThread,
         Thread.currentThread().getName() + ".logSyncer");
@@ -338,7 +368,7 @@ public class HLog implements HConstants,
    *
    * @return If lots of logs, flush the returned regions so next time through
    * we can clean logs. Returns null if nothing to flush.
-   * @throws org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException
+   * @throws FailedLogCloseException
    * @throws IOException
    */
   public byte [][] rollWriter() throws FailedLogCloseException, IOException {
@@ -358,6 +388,26 @@ public class HLog implements HConstants,
         this.filenum = System.currentTimeMillis();
         Path newPath = computeFilename(this.filenum);
         this.writer = createWriter(newPath);
+
+        // Get at the private FSDataOutputStream inside in SequenceFile.
+        // Make it accessible.  Our goal is to get at the underlying
+        // DFSOutputStream so that we can find out HDFS pipeline errors proactively.
+        try {
+          final Field field = writer.getClass().getDeclaredField("out");
+          field.setAccessible(true);
+          // get variable: writer.out
+          FSDataOutputStream writer_out = 
+            (FSDataOutputStream)field.get(writer);
+          // writer's OutputStream: writer.out.getWrappedStream()
+          // important: only valid for the lifetime of this.writer
+          this.hdfs_out = writer_out.getWrappedStream();
+        } catch (NoSuchFieldException ex) {
+          this.hdfs_out = null;
+        } catch (Exception ex) {
+          LOG.error("Problem obtaining hdfs_out: " + ex);
+          this.hdfs_out = null;
+        }
+        
         LOG.info((oldFile != null?
             "Roll " + FSUtils.getPath(oldFile) + ", entries=" +
             this.numEntries.get() +
@@ -391,6 +441,11 @@ public class HLog implements HConstants,
   protected SequenceFile.Writer createWriter(Path path) throws IOException {
     return createWriter(path, HLogKey.class, KeyValue.class);
   }
+  
+  // usage: see TestLogRolling.java
+  OutputStream getOutputStream() {
+    return this.hdfs_out;
+  }
 
   protected SequenceFile.Writer createWriter(Path path,
       Class<? extends HLogKey> keyClass, Class<? extends KeyValue> valueClass)
@@ -799,17 +854,34 @@ public class HLog implements HConstants,
         try {
           long now = System.currentTimeMillis();
           this.writer.sync();
-          if (this.append && syncfs != null) {
+          if (this.syncfs != null) {
             try {
              this.syncfs.invoke(this.writer, NO_ARGS); 
             } catch (Exception e) {
               throw new IOException("Reflection", e);
             }
           }
-          syncTime += System.currentTimeMillis() - now;
-          syncOps++;
+          this.syncTime += System.currentTimeMillis() - now;
+          this.syncOps++;
           this.forceSync = false;
           this.unflushedEntries.set(0);
+            
+          // if the number of replicas in HDFS has fallen below the initial   
+          // value, then roll logs.   
+          try {
+            int numCurrentReplicas = getLogReplication();
+            if (numCurrentReplicas != 0 &&  
+                numCurrentReplicas < fs.getDefaultReplication()) {  
+              LOG.warn("HDFS pipeline error detected. " +   
+                  "Found " + numCurrentReplicas + " replicas but expecting " +
+                  fs.getDefaultReplication() + " replicas. " +  
+                  " Requesting close of hlog.");  
+            requestLogRoll();   
+            } 
+          } catch (Exception e) {   
+              LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e +
+                       " still proceeding ahead...");  
+          }
         } catch (IOException e) {
           LOG.fatal("Could not append. Requesting close of hlog", e);
           requestLogRoll();
@@ -818,6 +890,29 @@ public class HLog implements HConstants,
       }
     }
   }
+  
+  /**
+   * This method gets the datanode replication count for the current HLog.
+   *
+   * If the pipeline isn't started yet or is empty, you will get the default 
+   * replication factor.  Therefore, if this function returns 0, it means you 
+   * are not properly running with the HDFS-826 patch.
+   * 
+   * @throws Exception
+   */
+  int getLogReplication() throws Exception {
+    if(this.getNumCurrentReplicas != null && this.hdfs_out != null) {
+      Object repl = this.getNumCurrentReplicas.invoke(this.hdfs_out, NO_ARGS);
+      if (repl instanceof Integer) {  
+        return ((Integer)repl).intValue();  
+      }
+    }
+    return 0;
+  }
+  
+  boolean canGetCurReplicas() {
+    return this.getNumCurrentReplicas != null;
+  }
 
   private void requestLogRoll() {
     if (this.listener != null) {
@@ -1225,7 +1320,7 @@ public class HLog implements HConstants,
   * @param conf
   * @return True if append enabled and we have the syncFs in our path.
   */
-  private static boolean isAppend(final HBaseConfiguration conf) {
+  static boolean isAppend(final HBaseConfiguration conf) {
     boolean append = conf.getBoolean("dfs.support.append", false);
     if (append) {
       try {
@@ -1320,7 +1415,7 @@ public class HLog implements HConstants,
   public static String getHLogDirectoryName(HServerInfo info) {
     return getHLogDirectoryName(HServerInfo.getServerName(info));
   }
-
+  
   /**
    * Construct the HLog directory name
    * 

Modified: hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestLogRolling.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestLogRolling.java?rev=921395&r1=921394&r2=921395&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestLogRolling.java
(original)
+++ hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestLogRolling.java
Wed Mar 10 15:20:36 2010
@@ -19,11 +19,14 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import java.io.OutputStream;
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.hbase.HBaseClusterTestCase;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
@@ -32,6 +35,13 @@ import org.apache.hadoop.hbase.client.HB
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
+import org.apache.log4j.Level;
 
 /**
  * Test log deletion as logs are rolled.
@@ -42,6 +52,17 @@ public class TestLogRolling extends HBas
   private HLog log;
   private String tableName;
   private byte[] value;
+
+  // verbose logging on classes that are touched in these tests
+  {
+    ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)HRegionServer.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)HRegion.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)HLog.LOG).getLogger().setLevel(Level.ALL);
+  }
   
   /**
    * constructor
@@ -70,9 +91,10 @@ public class TestLogRolling extends HBas
   }
 
   // Need to override this setup so we can edit the config before it gets sent
-  // to the cluster startup.
+  // to the HDFS & HBase cluster startup.
   @Override
-  protected void preHBaseClusterSetup() {
+  protected void setUp() throws Exception {
+    /**** configuration for testLogRolling ****/
     // Force a region split after every 768KB
     conf.setLong("hbase.hregion.max.filesize", 768L * 1024L);
 
@@ -90,10 +112,23 @@ public class TestLogRolling extends HBas
 
     // Increase the amount of time between client retries
     conf.setLong("hbase.client.pause", 15 * 1000);
-
+    
     // Reduce thread wake frequency so that other threads can get
     // a chance to run.
     conf.setInt(HConstants.THREAD_WAKE_FREQUENCY, 2 * 1000);
+
+    /**** configuration for testLogRollOnDatanodeDeath ****/
+    // make sure log.hflush() calls syncFs() to open a pipeline
+    conf.setBoolean("dfs.support.append", true);
+    // lower the namenode & datanode heartbeat so the namenode 
+    // quickly detects datanode failures
+    conf.setInt("heartbeat.recheck.interval", 5000);
+    conf.setInt("dfs.heartbeat.interval", 1);
+    // the namenode might still try to choose the recently-dead datanode 
+    // for a pipeline, so try to a new pipeline multiple times
+    conf.setInt("dfs.client.block.write.retries", 30);
+    
+    super.setUp();
   }
   
   private void startAndWriteData() throws Exception {
@@ -158,5 +193,94 @@ public class TestLogRolling extends HBas
       throw e;
     }
   }
+  
+  void writeData(HTable table, int rownum) throws Exception {
+    Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", rownum)));
+    put.add(HConstants.CATALOG_FAMILY, null, value);
+    table.put(put);
+
+    // sleep to let the log roller run (if it needs to)
+    try {
+      Thread.sleep(2000);
+    } catch (InterruptedException e) {
+      // continue
+    }
+  }
+  
+  /**
+   * Tests that logs are rolled upon detecting datanode death
+   * Requires an HDFS jar with HDFS-826 & syncFs() support (HDFS-200)
+   * 
+   * @throws Exception
+   */
+  public void testLogRollOnDatanodeDeath() throws Exception {
+    assertTrue("This test requires HLog file replication.", 
+        fs.getDefaultReplication() > 1);
+    
+    // When the META table can be opened, the region servers are running
+    new HTable(conf, HConstants.META_TABLE_NAME);
+    this.server = cluster.getRegionThreads().get(0).getRegionServer();
+    this.log = server.getLog();
+    
+    assertTrue("Need HDFS-826 for this test", log.canGetCurReplicas());
+    // don't run this test without append support (HDFS-200 & HDFS-142)
+    assertTrue("Need append support for this test", HLog.isAppend(conf));
+
+    // add up the datanode count, to ensure proper replication when we kill 1
+    dfsCluster.startDataNodes(conf, 1, true, null, null);
+    dfsCluster.waitActive();
+    assertTrue(dfsCluster.getDataNodes().size() >= 
+               fs.getDefaultReplication() + 1);
+
+    // Create the test table and open it
+    String tableName = getName();
+    HTableDescriptor desc = new HTableDescriptor(tableName);
+    desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
+    HBaseAdmin admin = new HBaseAdmin(conf);
+    admin.createTable(desc);
+    HTable table = new HTable(conf, tableName);
+    table.setAutoFlush(true);
+
+    long curTime = System.currentTimeMillis();
+    long oldFilenum = log.getFilenum();
+    assertTrue("Log should have a timestamp older than now", 
+             curTime > oldFilenum && oldFilenum != -1);
+
+    // normal write
+    writeData(table, 1);
+    assertTrue("The log shouldn't have rolled yet", 
+              oldFilenum == log.getFilenum());
+
+    // kill a datanode in the pipeline to force a log roll on the next sync()
+    OutputStream stm = log.getOutputStream();
+    Method getPipeline = null;
+    for (Method m : stm.getClass().getDeclaredMethods()) {
+      if(m.getName().endsWith("getPipeline")) {
+        getPipeline = m;
+        getPipeline.setAccessible(true);
+        break;
+      }
+    }
+    assertTrue("Need DFSOutputStream.getPipeline() for this test", 
+                getPipeline != null);
+    Object repl = getPipeline.invoke(stm, new Object []{} /*NO_ARGS*/);
+    DatanodeInfo[] pipeline = (DatanodeInfo[]) repl;
+    assertTrue(pipeline.length == fs.getDefaultReplication());
+    DataNodeProperties dnprop = dfsCluster.stopDataNode(pipeline[0].getName());
+    assertTrue(dnprop != null);
+
+    // this write should succeed, but trigger a log roll
+    writeData(table, 2);
+    long newFilenum = log.getFilenum();
+    assertTrue("Missing datanode should've triggered a log roll", 
+              newFilenum > oldFilenum && newFilenum > curTime);
+    
+    // write some more log data (this should use a new hdfs_out)
+    writeData(table, 3);
+    assertTrue("The log should not roll again.", 
+              log.getFilenum() == newFilenum);
+    assertTrue("New log file should have the default replication",
+              log.getLogReplication() == fs.getDefaultReplication());
+  }
 
 }



Mime
View raw message