hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject svn commit: r1542518 - in /hbase/branches/0.96/hbase-server/src: main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/util/ test/java/org/apache/hadoop/hbase/regionserver/
Date Sat, 16 Nov 2013 14:53:15 GMT
Author: tedyu
Date: Sat Nov 16 14:53:15 2013
New Revision: 1542518

URL: http://svn.apache.org/r1542518
Log:
HBASE-9949 Fix the race condition between Compaction and StoreScanner.init


Added:
    hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java
    hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/util/InjectionHandler.java
Modified:
    hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
    hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
    hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java

Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1542518&r1=1542517&r2=1542518&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
(original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
Sat Nov 16 14:53:15 2013
@@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
+import org.apache.hadoop.hbase.regionserver.StoreScanner.StoreScannerCompactionRace;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
@@ -74,6 +75,8 @@ import org.apache.hadoop.hbase.util.Byte
 import org.apache.hadoop.hbase.util.ChecksumType;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.InjectionEvent;
+import org.apache.hadoop.hbase.util.InjectionHandler;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -1379,6 +1382,8 @@ public class HStore implements Store {
       // scenario that could have happened if continue to hold the lock.
       notifyChangedReadersObservers();
       // At this point the store will use new files for all scanners.
+      InjectionHandler.processEvent(InjectionEvent.STORESCANNER_COMPACTION_RACE, new Object[]
{
+          StoreScannerCompactionRace.BEFORE_SEEK.ordinal()});
 
       // let the archive util decide if we should archive or delete the files
       LOG.debug("Removing store files after compaction...");

Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1542518&r1=1542517&r2=1542518&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
(original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
Sat Nov 16 14:53:15 2013
@@ -37,10 +37,11 @@ import org.apache.hadoop.hbase.KeyValueU
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.executor.ExecutorService;
 import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.regionserver.ScanInfo;
 import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.InjectionEvent;
+import org.apache.hadoop.hbase.util.InjectionHandler;
 
 /**
  * Scanner scans both the memstore and the Store. Coalesce KeyValue stream
@@ -96,6 +97,13 @@ public class StoreScanner extends NonLaz
   // A flag whether use pread for scan
   private boolean scanUsePread = false;
 
+  // used by the injection framework to test race between StoreScanner construction and compaction
+  enum StoreScannerCompactionRace {
+    BEFORE_SEEK,
+    AFTER_SEEK,
+    COMPACT_COMPLETE
+  }
+  
   /** An internal constructor. */
   protected StoreScanner(Store store, boolean cacheBlocks, Scan scan,
       final NavigableSet<byte[]> columns, long ttl, int minVersions) {
@@ -149,6 +157,8 @@ public class StoreScanner extends NonLaz
         ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
         oldestUnexpiredTS);
 
+    this.store.addChangedReaderObserver(this);
+
     // Pass columns to try to filter out unnecessary StoreFiles.
     List<KeyValueScanner> scanners = getScannersNoCompaction();
 
@@ -156,6 +166,8 @@ public class StoreScanner extends NonLaz
     // key does not exist, then to the start of the next matching Row).
     // Always check bloom filter to optimize the top row seek for delete
     // family marker.
+    InjectionHandler.processEvent(InjectionEvent.STORESCANNER_COMPACTION_RACE, new Object[]
{
+        StoreScannerCompactionRace.BEFORE_SEEK.ordinal()});
     if (explicitColumnQuery && lazySeekEnabledGlobally) {
       for (KeyValueScanner scanner : scanners) {
         scanner.requestSeek(matcher.getStartKey(), false, true);
@@ -178,8 +190,8 @@ public class StoreScanner extends NonLaz
 
     // Combine all seeked scanners with a heap
     heap = new KeyValueHeap(scanners, store.getComparator());
-
-    this.store.addChangedReaderObserver(this);
+    InjectionHandler.processEvent(InjectionEvent.STORESCANNER_COMPACTION_RACE, new Object[]
{
+        StoreScannerCompactionRace.AFTER_SEEK.ordinal()});
   }
 
   /**
@@ -229,9 +241,13 @@ public class StoreScanner extends NonLaz
           earliestPutTs, oldestUnexpiredTS, dropDeletesFromRow, dropDeletesToRow);
     }
 
+    this.store.addChangedReaderObserver(this);
+
     // Filter the list of scanners using Bloom filters, time range, TTL, etc.
     scanners = selectScannersFrom(scanners);
 
+    InjectionHandler.processEvent(InjectionEvent.STORESCANNER_COMPACTION_RACE, new Object[]
{
+        StoreScannerCompactionRace.BEFORE_SEEK.ordinal()});
     // Seek all scanners to the initial key
     if (!isParallelSeekEnabled) {
       for (KeyValueScanner scanner : scanners) {
@@ -243,6 +259,8 @@ public class StoreScanner extends NonLaz
 
     // Combine all seeked scanners with a heap
     heap = new KeyValueHeap(scanners, store.getComparator());
+    InjectionHandler.processEvent(InjectionEvent.STORESCANNER_COMPACTION_RACE, new Object[]
{
+        StoreScannerCompactionRace.AFTER_SEEK.ordinal()});
   }
 
   /** Constructor for testing. */
@@ -263,6 +281,10 @@ public class StoreScanner extends NonLaz
     this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
         Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS);
 
+    // In unit tests, the store could be null
+    if (this.store != null) {
+      this.store.addChangedReaderObserver(this);
+    }
     // Seek all scanners to the initial key
     if (!isParallelSeekEnabled) {
       for (KeyValueScanner scanner : scanners) {

Added: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java?rev=1542518&view=auto
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java
(added)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java
Sat Nov 16 14:53:15 2013
@@ -0,0 +1,32 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+/**
+ * Enumeration of all injection events.
+ * When defining new events, please PREFIX the name
+ * with the supervised class.
+ *
+ * Please see InjectionHandler.
+ */
+public enum InjectionEvent {
+  // Injection into Store.java
+  STORESCANNER_COMPACTION_RACE
+}

Added: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/util/InjectionHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/util/InjectionHandler.java?rev=1542518&view=auto
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/util/InjectionHandler.java
(added)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/util/InjectionHandler.java
Sat Nov 16 14:53:15 2013
@@ -0,0 +1,171 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * The InjectionHandler is an object provided to a class,
+ * which can perform custom actions for JUnit testing.
+ * JUnit test can implement custom version of the handler.
+ * For example, let's say we want to supervise FSImage object:
+ *
+ * <code>
+ * // JUnit test code
+ * class MyInjectionHandler extends InjectionHandler {
+ *   protected void _processEvent(InjectionEvent event,
+ *       Object... args) {
+ *     if (event == InjectionEvent.MY_EVENT) {
+ *       LOG.info("Handling my event for fsImage: "
+ *         + args[0].toString());
+ *     }
+ *   }
+ * }
+ *
+ * public void testMyEvent() {
+ *   InjectionHandler ih = new MyInjectionHandler();
+ *   InjectionHandler.set(ih);
+ *   ...
+ *
+ *   InjectionHandler.clear();
+ * }
+ *
+ * // supervised code example
+ *
+ * class FSImage {
+ *
+ *   private doSomething() {
+ *     ...
+ *     if (condition1 && InjectionHandler.trueCondition(MY_EVENT1) {
+ *       ...
+ *     }
+ *     if (condition2 || condition3
+ *       || InjectionHandler.falseCondition(MY_EVENT1) {
+ *       ...
+ *     }
+ *     ...
+ *     InjectionHandler.processEvent(MY_EVENT2, this)
+ *     ...
+ *     try {
+ *       read();
+ *       InjectionHandler.processEventIO(MY_EVENT3, this, object);
+ *       // might throw an exception when testing
+ *     catch (IOEXception) {
+ *       LOG.info("Exception")
+ *     }
+ *     ...
+ *   }
+ *   ...
+ * }
+ * </code>
+ *
+ * Each unit test should use a unique event type.
+ * The types can be defined by adding them to
+ * InjectionEvent class.
+ *
+ * methods:
+ *
+ * // simulate actions
+ * void processEvent()
+ * // simulate exceptions
+ * void processEventIO() throws IOException
+ *
+ * // simulate conditions
+ * boolean trueCondition()
+ * boolean falseCondition()
+ *
+ * The class implementing InjectionHandler must
+ * override respective protected methods
+ * _processEvent()
+ * _processEventIO()
+ * _trueCondition()
+ * _falseCondition()
+ */
+public class InjectionHandler {
+
+  private static final Log LOG = LogFactory.getLog(InjectionHandler.class);
+
+  // the only handler to which everyone reports
+  private static InjectionHandler handler = new InjectionHandler();
+
+  // can not be instantiated outside, unless a testcase extends it
+  protected InjectionHandler() {}
+
+  // METHODS FOR PRODUCTION CODE
+
+  protected void _processEvent(InjectionEvent event, Object... args) {
+    // by default do nothing
+  }
+
+  protected void _processEventIO(InjectionEvent event, Object... args) throws IOException{
+    // by default do nothing
+  }
+
+  protected boolean _trueCondition(InjectionEvent event, Object... args) {
+    return true; // neutral in conjunction
+  }
+
+  protected boolean _falseCondition(InjectionEvent event, Object... args) {
+    return false; // neutral in alternative
+  }
+
+  ////////////////////////////////////////////////////////////
+
+  /**
+   * Set to the empty/production implementation.
+   */
+  public static void clear() {
+    handler = new InjectionHandler();
+  }
+
+  /**
+   * Set custom implementation of the handler.
+   */
+  public static void set(InjectionHandler custom) {
+    LOG.warn("WARNING: SETTING INJECTION HANDLER" +
+      " - THIS SHOULD NOT BE USED IN PRODUCTION !!!");
+    handler = custom;
+  }
+
+  /*
+  * Static methods for reporting to the handler
+  */
+
+  public static void processEvent(InjectionEvent event, Object... args) {
+    handler._processEvent(event, args);
+  }
+
+  public static void processEventIO(InjectionEvent event, Object... args)
+    throws IOException {
+    handler._processEventIO(event, args);
+  }
+
+  public static boolean trueCondition(InjectionEvent event, Object... args) {
+    return handler._trueCondition(event, args);
+  }
+
+  public static boolean falseCondition(InjectionEvent event, Object... args) {
+    return handler._falseCondition(event, args);
+  }
+}
+

Modified: hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java?rev=1542518&r1=1542517&r2=1542518&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
(original)
+++ hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
Sat Nov 16 14:53:15 2013
@@ -27,18 +27,28 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.NavigableSet;
 import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import junit.framework.TestCase;
 
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueTestUtil;
 import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.StoreScanner.StoreScannerCompactionRace;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdge;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
+import org.apache.hadoop.hbase.util.InjectionEvent;
+import org.apache.hadoop.hbase.util.InjectionHandler;
+import org.apache.hadoop.hbase.util.Threads;
 import org.junit.experimental.categories.Category;
 
 // Can't be small as it plays with EnvironmentEdgeManager
@@ -501,6 +511,110 @@ public class TestStoreScanner extends Te
     assertEquals(false, scanner.next(results));
   }
 
+  private class StoreScannerCompactionRaceCondition extends InjectionHandler {
+    final Store store;
+    Boolean beforeSeek = false;
+    Boolean afterSeek = false;
+    Boolean compactionComplete = false;
+    final int waitTime;
+    boolean doneSeeking = false;
+    public Future<Void> f;
+    StoreScannerCompactionRaceCondition(Store s, int waitTime) {
+      this.store = s;
+      this.waitTime = waitTime;
+    }
+
+    protected void _processEvent(InjectionEvent event, Object... args) {
+      if (event == InjectionEvent.STORESCANNER_COMPACTION_RACE) {
+        // To prevent other scanners which are not supposed to be tested from taking this
code path.
+        if ((args instanceof Object[]) && (args.length == 1)
+            && (args[0] instanceof Integer)) {
+          StoreScannerCompactionRace sscr = StoreScannerCompactionRace.values()[(Integer)args[0]];
+          switch (sscr) {
+          case BEFORE_SEEK :
+            // Inside StoreScanner ctor before seek.
+            synchronized (beforeSeek) {
+              if (!beforeSeek) {
+                beforeSeek = true;
+                f = Executors.newSingleThreadExecutor().submit(new Callable<Void>()
{
+                  @Override
+                  public Void call() throws Exception {
+                    StoreScanner.enableLazySeekGlobally(false);
+                    ((HStore)store).compactRecentForTestingAssumingDefaultPolicy(
+                      store.getStorefiles().size() / 2);
+                    StoreScanner.enableLazySeekGlobally(true);
+                    return null;
+                  }
+                });
+                Threads.sleep(waitTime);
+              }
+            }
+            break;
+          case AFTER_SEEK:
+            // Inside StoreScanner ctor after seek.
+            synchronized (afterSeek) {
+              if (!afterSeek) {
+                afterSeek = true;
+                this.doneSeeking = true;
+              }
+            }
+            break;
+          case COMPACT_COMPLETE:
+            // Inside HStore.completeCompaction
+            synchronized (compactionComplete) {
+              if (!compactionComplete) {
+                compactionComplete = true;
+                assertTrue(doneSeeking);
+              }
+            }
+            break;
+          }
+        }
+      }
+    }
+  }
+
+  /*
+   *  Verifies that there is no race condition between StoreScanner construction and compaction.
+   *  This is done through 3 injection points:
+   *  1. before seek operation in StoreScanner ctor
+   *  2. after seek operation in StoreScanner ctor
+   *  3. after compaction completion
+   */
+  public void testCompactionRaceCondition() throws Exception {
+    HBaseTestingUtility util = new HBaseTestingUtility();
+    util.startMiniCluster(1);
+    byte[] t = Bytes.toBytes("tbl"), cf = Bytes.toBytes("cf");
+    HTable table = util.createTable(t, cf);
+    util.loadTable(table, cf);
+    util.flush();
+    util.loadTable(table, cf);
+    util.flush();
+    List<HRegion> regions = util.getHBaseCluster().getRegions(t);
+    assertTrue(regions.size() == 1);
+    HRegion r = regions.get(0);
+    Store s = r.getStore(cf);
+
+    // Setup the injection handler.
+    StoreScannerCompactionRaceCondition ih =
+        new StoreScannerCompactionRaceCondition(s, 5);
+    InjectionHandler.set(ih);
+
+    // Create a StoreScanner
+    TreeSet<byte[]> set = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+    set.add(cf);
+    Scan scanSpec = new Scan();
+    scanSpec.setStartRow(Bytes.toBytes("hjfsd"));
+    scanSpec.setStartRow(Bytes.toBytes("zjfsd"));
+    KeyValueScanner scanner = s.getScanner(scanSpec, set, s.getSmallestReadPoint());
+    ih.f.get();
+
+    // Clear injection handling and shutdown the minicluster.
+    InjectionHandler.clear();
+    scanner.close();
+    util.shutdownMiniCluster();
+  }
+
   public void testDeleteMarkerLongevity() throws Exception {
     try {
       final long now = System.currentTimeMillis();



Mime
View raw message