hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r1004340 - in /hbase/trunk/src: main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/regionserver/
Date Mon, 04 Oct 2010 17:59:38 GMT
Author: stack
Date: Mon Oct  4 17:59:38 2010
New Revision: 1004340

URL: http://svn.apache.org/viewvc?rev=1004340&view=rev
Log:
HBASE-3043 'hbase-daemon.sh stop regionserver' should kill compactions that are in progress

Modified:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java?rev=1004340&r1=1004339&r2=1004340&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
Mon Oct  4 17:59:38 2010
@@ -96,14 +96,16 @@ public class CompactSplitThread extends 
       HRegion r = null;
       try {
         r = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS);
-        if (r != null && !this.server.isStopped()) {
+        if (r != null) {
           lock.lock();
           try {
-            // Don't interrupt us while we are working
-            byte [] midKey = r.compactStores();
-            if (shouldSplitRegion() && midKey != null &&
-                !this.server.isStopped()) {
-              split(r, midKey);
+            if(!this.server.isStopped()) {
+              // Don't interrupt us while we are working
+              byte [] midKey = r.compactStores();
+              if (shouldSplitRegion() && midKey != null &&
+                  !this.server.isStopped()) {
+                split(r, midKey);
+              }
             }
           } finally {
             lock.unlock();
@@ -208,7 +210,11 @@ public class CompactSplitThread extends 
    */
   void interruptIfNecessary() {
     if (lock.tryLock()) {
-      this.interrupt();
+      try {
+        this.interrupt();
+      } finally {
+        lock.unlock();
+      }
     }
   }
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1004340&r1=1004339&r2=1004340&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Mon Oct  4
17:59:38 2010
@@ -20,6 +20,7 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.io.UnsupportedEncodingException;
 import java.lang.reflect.Constructor;
 import java.util.AbstractList;
@@ -207,7 +208,7 @@ public class HRegion implements HeapSize
     }
   }
 
-  private final WriteState writestate = new WriteState();
+  final WriteState writestate = new WriteState();
 
   final long memstoreFlushSize;
   private volatile long lastFlushTime;
@@ -429,6 +430,12 @@ public class HRegion implements HeapSize
   public boolean isClosing() {
     return this.closing.get();
   }
+  
+  boolean areWritesEnabled() {
+    synchronized(this.writestate) {
+      return this.writestate.writesEnabled;
+    }
+  }
 
    public ReadWriteConsistencyControl getRWCC() {
      return rwcc;
@@ -624,7 +631,7 @@ public class HRegion implements HeapSize
    * Do preparation for pending compaction.
    * @throws IOException
    */
-  private void doRegionCompactionPrep() throws IOException {
+  void doRegionCompactionPrep() throws IOException {
   }
 
   /*
@@ -717,16 +724,24 @@ public class HRegion implements HeapSize
         long startTime = EnvironmentEdgeManager.currentTimeMillis();
         doRegionCompactionPrep();
         long maxSize = -1;
-        for (Store store: stores.values()) {
-          final Store.StoreSize ss = store.compact(majorCompaction);
-          if (ss != null && ss.getSize() > maxSize) {
-            maxSize = ss.getSize();
-            splitRow = ss.getSplitRow();
+        boolean completed = false;
+        try {
+          for (Store store: stores.values()) {
+            final Store.StoreSize ss = store.compact(majorCompaction);
+            if (ss != null && ss.getSize() > maxSize) {
+              maxSize = ss.getSize();
+              splitRow = ss.getSplitRow();
+            }
           }
+          completed = true;
+        } catch (InterruptedIOException iioe) {
+          LOG.info("compaction interrupted by user: ", iioe);
+        } finally {
+          long now = EnvironmentEdgeManager.currentTimeMillis();
+          LOG.info(((completed) ? "completed" : "aborted")
+              + " compaction on region " + this 
+              + " after " + StringUtils.formatTimeDiff(now, startTime));
         }
-        String timeTaken = StringUtils.formatTimeDiff(EnvironmentEdgeManager.currentTimeMillis(),
-            startTime);
-        LOG.info("compaction completed on region " + this + " in " + timeTaken);
       } finally {
         synchronized (writestate) {
           writestate.compacting = false;

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1004340&r1=1004339&r2=1004340&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Mon Oct  4 17:59:38
2010
@@ -20,6 +20,7 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -90,6 +91,8 @@ public class Store implements HeapSize {
   protected long ttl;
   private long majorCompactionTime;
   private int maxFilesToCompact;
+  /* how many bytes to write between status checks */
+  static int closeCheckInterval = 0; 
   private final long desiredMaxFileSize;
   private volatile long storeSize = 0L;
   private final Object flushLock = new Object();
@@ -192,6 +195,10 @@ public class Store implements HeapSize {
     }
 
     this.maxFilesToCompact = conf.getInt("hbase.hstore.compaction.max", 10);
+    if (Store.closeCheckInterval == 0) {
+      Store.closeCheckInterval = conf.getInt(
+          "hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */);
+    }
     this.storefiles = sortAndClone(loadStoreFiles());
   }
 
@@ -813,23 +820,43 @@ public class Store implements HeapSize {
     // where all source cells are expired or deleted.
     StoreFile.Writer writer = null;
     try {
+    // NOTE: the majority of the time for a compaction is spent in this section
     if (majorCompaction) {
       InternalScanner scanner = null;
       try {
         Scan scan = new Scan();
         scan.setMaxVersions(family.getMaxVersions());
         scanner = new StoreScanner(this, scan, scanners);
+        int bytesWritten = 0;
         // since scanner.next() can return 'false' but still be delivering data,
         // we have to use a do/while loop.
         ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
         while (scanner.next(kvs)) {
-          // output to writer:
-          for (KeyValue kv : kvs) {
-            if (writer == null) {
-              writer = createWriterInTmp(maxKeyCount, 
-                this.compactionCompression);
+          if (writer == null && !kvs.isEmpty()) {
+            writer = createWriterInTmp(maxKeyCount, 
+              this.compactionCompression);
+          }
+          if (writer != null) {
+            // output to writer:
+            for (KeyValue kv : kvs) {
+              writer.append(kv);
+
+              // check periodically to see if a system stop is requested
+              if (Store.closeCheckInterval > 0) {
+                bytesWritten += kv.getLength();
+                if (bytesWritten > Store.closeCheckInterval) {
+                  bytesWritten = 0;
+                  if (!this.region.areWritesEnabled()) {
+                    writer.close();
+                    fs.delete(writer.getPath(), false);
+                    throw new InterruptedIOException(
+                        "Aborting compaction of store " + this + 
+                        " in region " + this.region + 
+                        " because user requested stop.");
+                  }
+                }
+              }
             }
-            writer.append(kv);
           }
           kvs.clear();
         }
@@ -842,9 +869,29 @@ public class Store implements HeapSize {
       MinorCompactingStoreScanner scanner = null;
       try {
         scanner = new MinorCompactingStoreScanner(this, scanners);
-        writer = createWriterInTmp(maxKeyCount);
-        while (scanner.next(writer)) {
-          // Nothing to do
+        if (scanner.peek() != null) {
+          writer = createWriterInTmp(maxKeyCount);
+          int bytesWritten = 0;
+          while (scanner.peek() != null) {
+            KeyValue kv = scanner.next();
+            writer.append(kv);
+
+            // check periodically to see if a system stop is requested
+            if (Store.closeCheckInterval > 0) {
+              bytesWritten += kv.getLength();
+              if (bytesWritten > Store.closeCheckInterval) {
+                bytesWritten = 0;
+                if (!this.region.areWritesEnabled()) {
+                  writer.close();
+                  fs.delete(writer.getPath(), false);
+                  throw new InterruptedIOException(
+                      "Aborting compaction of store " + this + 
+                      " in region " + this.region + 
+                      " because user requested stop.");
+                }
+              }
+            }
+          }
         }
       } finally {
         if (scanner != null)

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java?rev=1004340&r1=1004339&r2=1004340&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java Mon
Oct  4 17:59:38 2010
@@ -25,6 +25,7 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestCase;
 import org.apache.hadoop.hbase.HConstants;
@@ -33,12 +34,18 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+
 
 /**
  * Test compactions
@@ -217,17 +224,94 @@ public class TestCompaction extends HBas
     }
     assertTrue(containsStartRow);
     assertTrue(count == 3);
-    // Do a simple TTL test.
+    
+    // Multiple versions allowed for an entry, so the delete isn't enough
+    // Lower TTL and expire to ensure that all our entries have been wiped
     final int ttlInSeconds = 1;
     for (Store store: this.r.stores.values()) {
       store.ttl = ttlInSeconds * 1000;
     }
     Thread.sleep(ttlInSeconds * 1000);
+
     r.compactStores(true);
     count = count();
     assertTrue(count == 0);
   }
+  
+
+  /**
+   * Verify that you can stop a long-running compaction 
+   * (used during RS shutdown)
+   * @throws Exception
+   */
+  public void testInterruptCompaction() throws Exception {
+    assertEquals(0, count());
 
+    // lower the polling interval for this test
+    int origWI = Store.closeCheckInterval;
+    Store.closeCheckInterval = 10*1000; // 10 KB
+
+    try {
+      // Create a couple store files w/ 15KB (over 10KB interval)
+      int jmax = (int) Math.ceil(15.0/COMPACTION_THRESHOLD);
+      byte [] pad = new byte[1000]; // 1 KB chunk
+      for (int i = 0; i < COMPACTION_THRESHOLD; i++) {
+        HRegionIncommon loader = new HRegionIncommon(r);
+        Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
+        for (int j = 0; j < jmax; j++) {
+          p.add(COLUMN_FAMILY, Bytes.toBytes(j), pad);
+        }
+        addContent(loader, Bytes.toString(COLUMN_FAMILY));
+        loader.put(p);
+        loader.flushcache();
+      }
+      
+      HRegion spyR = spy(r);
+      doAnswer(new Answer() {
+        public Object answer(InvocationOnMock invocation) throws Throwable {
+          r.writestate.writesEnabled = false;
+          return invocation.callRealMethod();
+        }
+      }).when(spyR).doRegionCompactionPrep();
+
+      // force a minor compaction, but not before requesting a stop
+      spyR.compactStores();
+      
+      // ensure that the compaction stopped, all old files are intact, 
+      Store s = r.stores.get(COLUMN_FAMILY);
+      assertEquals(COMPACTION_THRESHOLD, s.getStorefilesCount());
+      assertTrue(s.getStorefilesSize() > 15*1000);
+      // and no new store files persisted past compactStores()
+      FileStatus[] ls = cluster.getFileSystem().listStatus(r.getTmpDir());
+      assertEquals(0, ls.length);
+
+    } finally {
+      // don't mess up future tests
+      r.writestate.writesEnabled = true;
+      Store.closeCheckInterval = origWI;
+
+      // Delete all Store information once done using
+      for (int i = 0; i < COMPACTION_THRESHOLD; i++) {
+        Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i)));
+        byte [][] famAndQf = {COLUMN_FAMILY, null};
+        delete.deleteFamily(famAndQf[0]);
+        r.delete(delete, null, true);
+      }
+      r.flushcache();
+
+      // Multiple versions allowed for an entry, so the delete isn't enough
+      // Lower TTL and expire to ensure that all our entries have been wiped
+      final int ttlInSeconds = 1;
+      for (Store store: this.r.stores.values()) {
+        store.ttl = ttlInSeconds * 1000;
+      }
+      Thread.sleep(ttlInSeconds * 1000);
+      
+      r.compactStores(true);
+      assertEquals(0, count());
+    }
+  }
+  
   private int count() throws IOException {
     int count = 0;
     for (StoreFile f: this.r.stores.



Mime
View raw message