hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From la...@apache.org
Subject [2/2] hbase git commit: HBASE-12457 Regions in transition for a long time when CLOSE interleaves with a slow compaction.
Date Thu, 13 Nov 2014 06:49:32 GMT
HBASE-12457 Regions in transition for a long time when CLOSE interleaves with a slow compaction.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/0e795c1c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/0e795c1c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/0e795c1c

Branch: refs/heads/branch-1
Commit: 0e795c1cf8621df2d33600f4b33a00344fe5de5a
Parents: b25e5bb
Author: Lars Hofhansl <larsh@apache.org>
Authored: Wed Nov 12 22:49:00 2014 -0800
Committer: Lars Hofhansl <larsh@apache.org>
Committed: Wed Nov 12 22:49:46 2014 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/HRegion.java      |  41 +++++-
 .../hadoop/hbase/regionserver/HStore.java       |  10 ++
 .../hbase/regionserver/SplitTransaction.java    |   1 -
 .../apache/hadoop/hbase/regionserver/Store.java |  12 ++
 .../compactions/DefaultCompactor.java           |   3 +
 .../hbase/regionserver/TestCompactionIO.java    | 126 +++++++++++++++++++
 6 files changed, 189 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/0e795c1c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index d0600c9..210299b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -250,6 +250,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver
{ //
     REPLAY_BATCH_MUTATE, COMPACT_REGION
   }
 
+  private final Map<Thread, Store> currentCompactions = Maps.newConcurrentMap();
+
   //////////////////////////////////////////////////////////////////////////////
   // Members
   //////////////////////////////////////////////////////////////////////////////
@@ -779,6 +781,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver
{ //
     // A region can be reopened if failed a split; reset flags
     this.closing.set(false);
     this.closed.set(false);
+    this.writestate.writesEnabled = true;
 
     if (coprocessorHost != null) {
       status.setStatus("Running coprocessor post-open hooks");
@@ -1182,7 +1185,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver
{ //
       // region.
       writestate.writesEnabled = false;
       LOG.debug("Closing " + this + ": disabling compactions & flushes");
-      waitForFlushesAndCompactions();
+      // give compactions 30s to finish before we start to interrupt
+      waitForFlushesAndCompactions(30000);
     }
     // If we were not just flushing, is it worth doing a preflush...one
     // that will clear out of the bulk of the memstore before we put up
@@ -1309,6 +1313,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver
{ //
    * Exposed for TESTING.
    */
   public void waitForFlushesAndCompactions() {
+    waitForFlushesAndCompactions(0);
+  }
+
+  /**
+   * Wait for all current flushes and compactions of the region to complete.
+   * <p>
+   * Exposed for TESTING.
+   */
+  public void waitForFlushesAndCompactions(long millis) {
     synchronized (writestate) {
       boolean interrupted = false;
       try {
@@ -1316,7 +1329,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver
{ //
           LOG.debug("waiting for " + writestate.compacting + " compactions"
             + (writestate.flushing ? " & cache flush" : "") + " to complete for region
" + this);
           try {
-            writestate.wait();
+            long start = EnvironmentEdgeManager.currentTime();
+            writestate.wait(millis);
+            if (millis > 0 && EnvironmentEdgeManager.currentTime() - start >=
millis) {
+              // if we waited once for compactions to finish, interrupt them, and try again
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Waited for " + millis
+                  + " ms for compactions to finish on close. Interrupting "
+                  + currentCompactions.size() + " compactions.");
+              }
+              for (Thread t : currentCompactions.keySet()) {
+                // interrupt any current IO in the currently running compactions.
+                t.interrupt();
+              }
+              millis = 0;
+            }
           } catch (InterruptedException iex) {
             // essentially ignore and propagate the interrupt back up
             LOG.warn("Interrupted while waiting");
@@ -5763,7 +5790,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver
{ //
   public static final long FIXED_OVERHEAD = ClassSize.align(
       ClassSize.OBJECT +
       ClassSize.ARRAY +
-      41 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
+      42 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
       (12 * Bytes.SIZEOF_LONG) +
       4 * Bytes.SIZEOF_BOOLEAN);
 
@@ -6338,6 +6365,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver
{ //
         : (hasMinor ? CompactionState.MINOR : CompactionState.NONE));
   }
 
+  public void reportCompactionStart(Store store) {
+    currentCompactions.put(Thread.currentThread(), store);
+  }
+
+  public void reportCompactionEnd(Store store) {
+    currentCompactions.remove(Thread.currentThread());
+  }
+
   public void reportCompactionRequestStart(boolean isMajor){
     (isMajor ? majorInProgress : minorInProgress).incrementAndGet();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e795c1c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 5c06756..8399131 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -2242,4 +2242,14 @@ public class HStore implements Store {
   public void deregisterChildren(ConfigurationManager manager) {
     // No children to deregister
   }
+
+  @Override
+  public void reportCompactionStart() {
+    getHRegion().reportCompactionStart(this);
+  }
+
+  @Override
+  public void reportCompactionEnd() {
+    getHRegion().reportCompactionEnd(this);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e795c1c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
index 75c2902..74e0fe3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
@@ -782,7 +782,6 @@ public class SplitTransaction {
         break;
 
       case CREATE_SPLIT_DIR:
-        this.parent.writestate.writesEnabled = true;
         this.parent.getRegionFileSystem().cleanupSplitsDir();
         break;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e795c1c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index 70faff1..cb73810 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -398,4 +398,16 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
    * @throws IOException
    */
    void refreshStoreFiles() throws IOException;
+
+   /**
+    * report the beginning of a compaction
+    * this must be called from the thread performing the compaction
+    */
+   void reportCompactionStart();
+
+   /**
+    * report the completion of a compaction
+    * this must be called from the thread performing the compaction
+    */
+   void reportCompactionEnd();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e795c1c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
index aae3968..a7dc28d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
@@ -26,6 +26,7 @@ import java.util.List;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
@@ -57,6 +58,7 @@ public class DefaultCompactor extends Compactor {
     boolean cleanSeqId = false;
     IOException e = null;
     try {
+      store.reportCompactionStart();
       InternalScanner scanner = null;
       try {
         /* Include deletes, unless we are doing a compaction of all files */
@@ -108,6 +110,7 @@ public class DefaultCompactor extends Compactor {
           newFiles.add(writer.getPath());
         }
       }
+      store.reportCompactionEnd();
     }
     return newFiles;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e795c1c/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionIO.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionIO.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionIO.java
new file mode 100644
index 0000000..efcffaf
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionIO.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
+import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test compaction IO cancellation.
+ */
+@Category(MediumTests.class)
+public class TestCompactionIO {
+  private static final HBaseTestingUtility UTIL = HBaseTestingUtility.createLocalHTU();
+  private static final CountDownLatch latch = new CountDownLatch(1);
+  /**
+   * verify that a compaction stuck in IO is aborted when we attempt to close a region
+   * @throws Exception
+   */
+  @Test
+  public void testInterruptCompactionIO() throws Exception {
+    byte [] STARTROW = Bytes.toBytes(START_KEY);
+    byte [] COLUMN_FAMILY = fam1;
+    Configuration conf = UTIL.getConfiguration();
+    conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024*1024);
+    conf.setInt("hbase.hregion.memstore.block.multiplier", 100);
+    conf.set(DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY, BlockedCompactor.class.getName());
+    int compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
+
+    final HRegion r = UTIL.createLocalHRegion(UTIL.createTableDescriptor("TestCompactionIO"),
null, null);
+
+    //Create a couple store files w/ 15KB (over 10KB interval)
+    int jmax = (int) Math.ceil(15.0/compactionThreshold);
+    byte [] pad = new byte[1000]; // 1 KB chunk
+    for (int i = 0; i < compactionThreshold; i++) {
+      Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
+      p.setDurability(Durability.SKIP_WAL);
+      for (int j = 0; j < jmax; j++) {
+        p.add(COLUMN_FAMILY, Bytes.toBytes(j), pad);
+      }
+      UTIL.loadRegion(r, COLUMN_FAMILY);
+      r.put(p);
+      r.flushcache();
+    }
+    new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          latch.await();
+          Thread.sleep(1000);
+          r.close();
+        } catch (Exception x) {
+          throw new RuntimeException(x);
+        }
+      }
+    }).start();
+    // hangs
+    r.compactStores();
+
+    // ensure that the compaction stopped, all old files are intact,
+    Store s = r.stores.get(COLUMN_FAMILY);
+    assertEquals(compactionThreshold, s.getStorefilesCount());
+    assertTrue(s.getStorefilesSize() > 15*1000);
+    // and no new store files persisted past compactStores()
+    FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir());
+
+    // this is happening after the compaction start, the DefaultCompactor does not
+    // clean tmp files when it encounters an IOException. Should it?
+    assertEquals(1, ls.length);
+  }
+
+  public static class BlockedCompactor extends DefaultCompactor {
+    public BlockedCompactor(final Configuration conf, final Store store) {
+      super(conf, store);
+    }
+    @Override
+    protected boolean performCompaction(InternalScanner scanner,
+        CellSink writer, long smallestReadPoint, boolean cleanSeqId) throws IOException {
+      CellSink myWriter = new CellSink() {
+        @Override
+        public void append(Cell cell) throws IOException {
+          try {
+            Thread.sleep(100000);
+          } catch (InterruptedException ie) {
+            throw new InterruptedIOException(ie.getMessage());
+          }
+        }
+      };
+      latch.countDown();
+      return super.performCompaction(scanner, myWriter, smallestReadPoint, cleanSeqId);
+    }
+  }
+}


Mime
View raw message