hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject svn commit: r1486211 - in /hbase/branches/0.95/hbase-server/src: main/java/org/apache/hadoop/hbase/regionserver/wal/ test/java/org/apache/hadoop/hbase/ test/java/org/apache/hadoop/hbase/regionserver/wal/
Date Fri, 24 May 2013 20:35:51 GMT
Author: tedyu
Date: Fri May 24 20:35:51 2013
New Revision: 1486211

URL: http://svn.apache.org/r1486211
Log:
HBASE-8597 compaction record (probably) can block WAL cleanup forever if region is closed
without edits (Sergey)


Modified:
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java?rev=1486211&r1=1486210&r2=1486211&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
(original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
Fri May 24 20:35:51 2013
@@ -832,7 +832,13 @@ class FSHLog implements HLog, Syncable {
   public void append(HRegionInfo info, byte [] tableName, WALEdit edits,
     final long now, HTableDescriptor htd)
   throws IOException {
-    append(info, tableName, edits, HConstants.DEFAULT_CLUSTER_ID, now, htd);
+    append(info, tableName, edits, now, htd, true);
+  }
+
+  @Override
+  public void append(HRegionInfo info, byte [] tableName, WALEdit edits,
+    final long now, HTableDescriptor htd, boolean isInMemstore) throws IOException {
+    append(info, tableName, edits, HConstants.DEFAULT_CLUSTER_ID, now, htd, true, isInMemstore);
   }
 
   /**
@@ -862,9 +868,9 @@ class FSHLog implements HLog, Syncable {
    * @throws IOException
    */
   private long append(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId,
-      final long now, HTableDescriptor htd, boolean doSync)
+      final long now, HTableDescriptor htd, boolean doSync, boolean isInMemstore)
     throws IOException {
-      if (edits.isEmpty()) return this.unflushedEntries.get();;
+      if (edits.isEmpty()) return this.unflushedEntries.get();
       if (this.closed) {
         throw new IOException("Cannot append; log is closed");
       }
@@ -879,7 +885,7 @@ class FSHLog implements HLog, Syncable {
         // Use encoded name.  Its shorter, guaranteed unique and a subset of
         // actual  name.
         byte [] encodedRegionName = info.getEncodedNameAsBytes();
-        this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum);
+        if (isInMemstore) this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum);
         HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterId);
         doWrite(info, logKey, edits, htd);
         this.numEntries.incrementAndGet();
@@ -903,14 +909,7 @@ class FSHLog implements HLog, Syncable {
   public long appendNoSync(HRegionInfo info, byte [] tableName, WALEdit edits,
     UUID clusterId, final long now, HTableDescriptor htd)
     throws IOException {
-    return append(info, tableName, edits, clusterId, now, htd, false);
-  }
-
-  @Override
-  public long append(HRegionInfo info, byte [] tableName, WALEdit edits,
-    UUID clusterId, final long now, HTableDescriptor htd)
-    throws IOException {
-    return append(info, tableName, edits, clusterId, now, htd, true);
+    return append(info, tableName, edits, clusterId, now, htd, false, true);
   }
 
   /**

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1486211&r1=1486210&r2=1486211&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
(original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
Fri May 24 20:35:51 2013
@@ -264,39 +264,30 @@ public interface HLog {
   public void closeAndDelete() throws IOException;
 
   /**
-   * Only used in tests.
-   *
-   * @param info
-   * @param tableName
-   * @param edits
-   * @param now
-   * @param htd
-   * @throws IOException
+   * Same as {@link #appendNoSync(HRegionInfo, byte[], WALEdit, UUID, long, HTableDescriptor)},
+   * except it causes a sync on the log
    */
   public void append(HRegionInfo info, byte[] tableName, WALEdit edits,
       final long now, HTableDescriptor htd) throws IOException;
 
   /**
    * Append a set of edits to the log. Log edits are keyed by (encoded)
-   * regionName, rowname, and log-sequence-id. The HLog is not flushed after
-   * this transaction is written to the log.
-   *
+   * regionName, rowname, and log-sequence-id. The HLog is flushed after this
+   * transaction is written to the log.
    * @param info
    * @param tableName
    * @param edits
-   * @param clusterId
-   *          The originating clusterId for this edit (for replication)
    * @param now
-   * @return txid of this transaction
-   * @throws IOException
+   * @param htd
+   * @param isInMemstore Whether the record is in memstore. False for system records.
    */
-  public long appendNoSync(HRegionInfo info, byte[] tableName, WALEdit edits,
-      UUID clusterId, final long now, HTableDescriptor htd) throws IOException;
+  public void append(HRegionInfo info, byte[] tableName, WALEdit edits,
+      final long now, HTableDescriptor htd, boolean isInMemstore) throws IOException;
 
   /**
    * Append a set of edits to the log. Log edits are keyed by (encoded)
-   * regionName, rowname, and log-sequence-id. The HLog is flushed after this
-   * transaction is written to the log.
+   * regionName, rowname, and log-sequence-id. The HLog is not flushed after
+   * this transaction is written to the log.
    *
    * @param info
    * @param tableName
@@ -308,7 +299,7 @@ public interface HLog {
    * @return txid of this transaction
    * @throws IOException
    */
-  public long append(HRegionInfo info, byte[] tableName, WALEdit edits,
+  public long appendNoSync(HRegionInfo info, byte[] tableName, WALEdit edits,
       UUID clusterId, final long now, HTableDescriptor htd) throws IOException;
 
   public void hsync() throws IOException;

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java?rev=1486211&r1=1486210&r2=1486211&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
(original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
Fri May 24 20:35:51 2013
@@ -262,7 +262,7 @@ public class HLogUtil {
       final CompactionDescriptor c) throws IOException {
     WALEdit e = WALEdit.createCompaction(c);
     log.append(info, c.getTableName().toByteArray(), e,
-        EnvironmentEdgeManager.currentTimeMillis(), htd);
+        EnvironmentEdgeManager.currentTimeMillis(), htd, false);
     if (LOG.isTraceEnabled()) {
       LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));
     }

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=1486211&r1=1486210&r2=1486211&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
(original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
Fri May 24 20:35:51 2013
@@ -734,7 +734,16 @@ public class HBaseTestingUtility extends
    */
   public MiniHBaseCluster startMiniCluster(final int numMasters,
       final int numSlaves, final String[] dataNodeHosts) throws Exception {
-    return startMiniCluster(numMasters, numSlaves, dataNodeHosts, null, null);
+    return startMiniCluster(numMasters, numSlaves, numSlaves, dataNodeHosts, null, null);
+  }
+
+  /**
+   * Same as {@link #startMiniCluster(int, int)}, but with custom number of datanodes.
+   * @param numDataNodes Number of data nodes.
+   */
+  public MiniHBaseCluster startMiniCluster(final int numMasters,
+      final int numSlaves, final int numDataNodes) throws Exception {
+    return startMiniCluster(numMasters, numSlaves, numDataNodes, null, null, null);
   }
 
   /**
@@ -765,12 +774,24 @@ public class HBaseTestingUtility extends
    * @return Mini hbase cluster instance created.
    */
   public MiniHBaseCluster startMiniCluster(final int numMasters,
-    final int numSlaves, final String[] dataNodeHosts,
+      final int numSlaves, final String[] dataNodeHosts, Class<? extends HMaster> masterClass,
+      Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
+          throws Exception {
+    return startMiniCluster(
+        numMasters, numSlaves, numSlaves, dataNodeHosts, masterClass, regionserverClass);
+  }
+
+  /**
+   * Same as {@link #startMiniCluster(int, int, String[], Class, Class)}, but with custom
+   * number of datanodes.
+   * @param numDataNodes Number of data nodes.
+   */
+  public MiniHBaseCluster startMiniCluster(final int numMasters,
+    final int numSlaves, int numDataNodes, final String[] dataNodeHosts,
     Class<? extends HMaster> masterClass,
     Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
   throws Exception {
-    int numDataNodes = numSlaves;
-    if ( dataNodeHosts != null && dataNodeHosts.length != 0) {
+    if (dataNodeHosts != null && dataNodeHosts.length != 0) {
       numDataNodes = dataNodeHosts.length;
     }
 

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java?rev=1486211&r1=1486210&r2=1486211&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
(original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
Fri May 24 20:35:51 2013
@@ -57,9 +57,11 @@ import org.apache.hadoop.hbase.exception
 import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -67,6 +69,7 @@ import org.apache.hadoop.hdfs.server.dat
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
 import org.apache.log4j.Level;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -170,7 +173,7 @@ public class TestLogRolling  {
 
   @Before
   public void setUp() throws Exception {
-    TEST_UTIL.startMiniCluster(2);
+    TEST_UTIL.startMiniCluster(1, 1, 2);
 
     cluster = TEST_UTIL.getHBaseCluster();
     dfsCluster = TEST_UTIL.getDFSCluster();
@@ -192,18 +195,12 @@ public class TestLogRolling  {
     this.server = cluster.getRegionServerThreads().get(0).getRegionServer();
     this.log = server.getWAL();
 
-    // Create the test table and open it
-    HTableDescriptor desc = new HTableDescriptor(tableName);
-    desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
-    admin.createTable(desc);
-    HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
+    HTable table = createTestTable(this.tableName);
 
     server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName));
     this.log = server.getWAL();
     for (int i = 1; i <= 256; i++) {    // 256 writes should cause 8 log rolls
-      Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", i)));
-      put.add(HConstants.CATALOG_FAMILY, null, value);
-      table.put(put);
+      doPut(table, i);
       if (i % 32 == 0) {
         // After every 32 writes sleep to let the log roller run
         try {
@@ -221,7 +218,7 @@ public class TestLogRolling  {
    * @throws org.apache.hadoop.hbase.exceptions.FailedLogCloseException
    */
   @Test
-  public void testLogRolling() throws FailedLogCloseException, IOException {
+  public void testLogRolling() throws Exception {
     this.tableName = getName();
       startAndWriteData();
       LOG.info("after writing there are " + ((FSHLog) log).getNumLogFiles() + " log files");
@@ -248,9 +245,7 @@ public class TestLogRolling  {
   }
 
   void writeData(HTable table, int rownum) throws IOException {
-    Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", rownum)));
-    put.add(HConstants.CATALOG_FAMILY, null, value);
-    table.put(put);
+    doPut(table, rownum);
 
     // sleep to let the log roller run (if it needs to)
     try {
@@ -324,12 +319,7 @@ public class TestLogRolling  {
   /**
    * Tests that logs are rolled upon detecting datanode death
    * Requires an HDFS jar with HDFS-826 & syncFs() support (HDFS-200)
-   * @throws IOException
-   * @throws InterruptedException
-   * @throws InvocationTargetException
-   * @throws IllegalAccessException
-   * @throws IllegalArgumentException
-    */
+   */
   @Test
   public void testLogRollOnDatanodeDeath() throws Exception {
     assertTrue("This test requires HLog file replication set to 2.",
@@ -587,5 +577,75 @@ public class TestLogRolling  {
     }
   }
 
+  /**
+   * Tests that logs are deleted when some region has a compaction
+   * record in WAL and no other records. See HBASE-8597.
+   */
+  @Test
+  public void testCompactionRecordDoesntBlockRolling() throws Exception {
+    // When the META table can be opened, the region servers are running
+    new HTable(TEST_UTIL.getConfiguration(), HConstants.META_TABLE_NAME);
+
+    String tableName = getName();
+    HTable table = createTestTable(tableName);
+    String tableName2 = tableName + "1";
+    HTable table2 = createTestTable(tableName2);
+
+    server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName));
+    this.log = server.getWAL();
+    FSHLog fshLog = (FSHLog)log;
+    HRegion region = server.getOnlineRegions(table2.getTableName()).get(0);
+    Store s = region.getStore(HConstants.CATALOG_FAMILY);
+
+
+    // Put some stuff into table2, to make sure we have some files to compact.
+    for (int i = 1; i <= 2; ++i) {
+      doPut(table2, i);
+      admin.flush(table2.getTableName());
+    }
+    doPut(table2, 3); // don't flush yet, or compaction might trigger before we roll WAL
+    assertEquals("Should have no WAL after initial writes", 0, fshLog.getNumLogFiles());
+    assertEquals(2, s.getStorefilesCount());
+
+    // Roll the log and compact table2, to have compaction record in the 2nd WAL.
+    fshLog.rollWriter();
+    assertEquals("Should have WAL; one table is not flushed", 1, fshLog.getNumLogFiles());
+    admin.flush(table2.getTableName());
+    region.compactStores();
+    // Wait for compaction in case if flush triggered it before us.
+    Assert.assertNotNull(s);
+    for (int waitTime = 3000; s.getStorefilesCount() > 1 && waitTime > 0; waitTime
-= 200) {
+      Threads.sleepWithoutInterrupt(200);
+    }
+    assertEquals("Compaction didn't happen", 1, s.getStorefilesCount());
+
+    // Write some value to the table so the WAL cannot be deleted until table is flushed.
+    doPut(table, 0); // Now 2nd WAL will have compaction record for table2 and put for table.
+    fshLog.rollWriter(); // 1st WAL deleted, 2nd not deleted yet.
+    assertEquals("Should have WAL; one table is not flushed", 1, fshLog.getNumLogFiles());
+
+    // Flush table to make latest WAL obsolete; write another record, and roll again.
+    admin.flush(table.getTableName());
+    doPut(table, 1);
+    fshLog.rollWriter(); // Now 2nd WAL is deleted and 3rd is added.
+    assertEquals("Should have 1 WALs at the end", 1, fshLog.getNumLogFiles());
+
+    table.close();
+    table2.close();
+  }
+
+  private void doPut(HTable table, int i) throws IOException {
+    Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", i)));
+    put.add(HConstants.CATALOG_FAMILY, null, value);
+    table.put(put);
+  }
+
+  private HTable createTestTable(String tableName) throws IOException {
+    // Create the test table and open it
+    HTableDescriptor desc = new HTableDescriptor(tableName);
+    desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
+    admin.createTable(desc);
+    return new HTable(TEST_UTIL.getConfiguration(), tableName);
+  }
 }
 



Mime
View raw message