Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7B331108C3 for ; Sat, 16 Nov 2013 14:53:49 +0000 (UTC) Received: (qmail 92562 invoked by uid 500); 16 Nov 2013 14:53:47 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 92471 invoked by uid 500); 16 Nov 2013 14:53:40 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 92461 invoked by uid 99); 16 Nov 2013 14:53:38 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 16 Nov 2013 14:53:38 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 16 Nov 2013 14:53:36 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 2128823888FE; Sat, 16 Nov 2013 14:53:16 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1542518 - in /hbase/branches/0.96/hbase-server/src: main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/util/ test/java/org/apache/hadoop/hbase/regionserver/ Date: Sat, 16 Nov 2013 14:53:15 -0000 To: commits@hbase.apache.org From: tedyu@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20131116145316.2128823888FE@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: tedyu Date: Sat Nov 16 14:53:15 2013 New Revision: 1542518 URL: http://svn.apache.org/r1542518 Log: HBASE-9949 Fix the race condition between Compaction and StoreScanner.init Added: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/util/InjectionHandler.java Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1542518&r1=1542517&r2=1542518&view=diff ============================================================================== --- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (original) +++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java Sat Nov 16 14:53:15 2013 @@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.io.hfile. import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; +import org.apache.hadoop.hbase.regionserver.StoreScanner.StoreScannerCompactionRace; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; @@ -74,6 +75,8 @@ import org.apache.hadoop.hbase.util.Byte import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.InjectionEvent; +import org.apache.hadoop.hbase.util.InjectionHandler; import org.apache.hadoop.util.StringUtils; import com.google.common.annotations.VisibleForTesting; @@ -1379,6 +1382,8 @@ public class HStore implements Store { // scenario that could have happened if continue to hold the lock. notifyChangedReadersObservers(); // At this point the store will use new files for all scanners. + InjectionHandler.processEvent(InjectionEvent.STORESCANNER_COMPACTION_RACE, new Object[] { + StoreScannerCompactionRace.BEFORE_SEEK.ordinal()}); // let the archive util decide if we should archive or delete the files LOG.debug("Removing store files after compaction..."); Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1542518&r1=1542517&r2=1542518&view=diff ============================================================================== --- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original) +++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Sat Nov 16 14:53:15 2013 @@ -37,10 +37,11 @@ import org.apache.hadoop.hbase.KeyValueU import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.regionserver.ScanInfo; import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.InjectionEvent; +import org.apache.hadoop.hbase.util.InjectionHandler; /** * Scanner scans both the memstore and the Store. Coalesce KeyValue stream @@ -96,6 +97,13 @@ public class StoreScanner extends NonLaz // A flag whether use pread for scan private boolean scanUsePread = false; + // used by the injection framework to test race between StoreScanner construction and compaction + enum StoreScannerCompactionRace { + BEFORE_SEEK, + AFTER_SEEK, + COMPACT_COMPLETE + } + /** An internal constructor. */ protected StoreScanner(Store store, boolean cacheBlocks, Scan scan, final NavigableSet columns, long ttl, int minVersions) { @@ -149,6 +157,8 @@ public class StoreScanner extends NonLaz ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS); + this.store.addChangedReaderObserver(this); + // Pass columns to try to filter out unnecessary StoreFiles. List scanners = getScannersNoCompaction(); @@ -156,6 +166,8 @@ public class StoreScanner extends NonLaz // key does not exist, then to the start of the next matching Row). // Always check bloom filter to optimize the top row seek for delete // family marker. + InjectionHandler.processEvent(InjectionEvent.STORESCANNER_COMPACTION_RACE, new Object[] { + StoreScannerCompactionRace.BEFORE_SEEK.ordinal()}); if (explicitColumnQuery && lazySeekEnabledGlobally) { for (KeyValueScanner scanner : scanners) { scanner.requestSeek(matcher.getStartKey(), false, true); @@ -178,8 +190,8 @@ public class StoreScanner extends NonLaz // Combine all seeked scanners with a heap heap = new KeyValueHeap(scanners, store.getComparator()); - - this.store.addChangedReaderObserver(this); + InjectionHandler.processEvent(InjectionEvent.STORESCANNER_COMPACTION_RACE, new Object[] { + StoreScannerCompactionRace.AFTER_SEEK.ordinal()}); } /** @@ -229,9 +241,13 @@ public class StoreScanner extends NonLaz earliestPutTs, oldestUnexpiredTS, dropDeletesFromRow, dropDeletesToRow); } + this.store.addChangedReaderObserver(this); + // Filter the list of scanners using Bloom filters, time range, TTL, etc. scanners = selectScannersFrom(scanners); + InjectionHandler.processEvent(InjectionEvent.STORESCANNER_COMPACTION_RACE, new Object[] { + StoreScannerCompactionRace.BEFORE_SEEK.ordinal()}); // Seek all scanners to the initial key if (!isParallelSeekEnabled) { for (KeyValueScanner scanner : scanners) { @@ -243,6 +259,8 @@ public class StoreScanner extends NonLaz // Combine all seeked scanners with a heap heap = new KeyValueHeap(scanners, store.getComparator()); + InjectionHandler.processEvent(InjectionEvent.STORESCANNER_COMPACTION_RACE, new Object[] { + StoreScannerCompactionRace.AFTER_SEEK.ordinal()}); } /** Constructor for testing. */ @@ -263,6 +281,10 @@ public class StoreScanner extends NonLaz this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType, Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS); + // In unit tests, the store could be null + if (this.store != null) { + this.store.addChangedReaderObserver(this); + } // Seek all scanners to the initial key if (!isParallelSeekEnabled) { for (KeyValueScanner scanner : scanners) { Added: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java?rev=1542518&view=auto ============================================================================== --- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java (added) +++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java Sat Nov 16 14:53:15 2013 @@ -0,0 +1,32 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +/** + * Enumeration of all injection events. + * When defining new events, please PREFIX the name + * with the supervised class. + * + * Please see InjectionHandler. + */ +public enum InjectionEvent { + // Injection into Store.java + STORESCANNER_COMPACTION_RACE +} Added: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/util/InjectionHandler.java URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/util/InjectionHandler.java?rev=1542518&view=auto ============================================================================== --- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/util/InjectionHandler.java (added) +++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/util/InjectionHandler.java Sat Nov 16 14:53:15 2013 @@ -0,0 +1,171 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * The InjectionHandler is an object provided to a class, + * which can perform custom actions for JUnit testing. + * JUnit test can implement custom version of the handler. + * For example, let's say we want to supervise FSImage object: + * + * + * // JUnit test code + * class MyInjectionHandler extends InjectionHandler { + * protected void _processEvent(InjectionEvent event, + * Object... args) { + * if (event == InjectionEvent.MY_EVENT) { + * LOG.info("Handling my event for fsImage: " + * + args[0].toString()); + * } + * } + * } + * + * public void testMyEvent() { + * InjectionHandler ih = new MyInjectionHandler(); + * InjectionHandler.set(ih); + * ... + * + * InjectionHandler.clear(); + * } + * + * // supervised code example + * + * class FSImage { + * + * private doSomething() { + * ... + * if (condition1 && InjectionHandler.trueCondition(MY_EVENT1) { + * ... + * } + * if (condition2 || condition3 + * || InjectionHandler.falseCondition(MY_EVENT1) { + * ... + * } + * ... + * InjectionHandler.processEvent(MY_EVENT2, this) + * ... + * try { + * read(); + * InjectionHandler.processEventIO(MY_EVENT3, this, object); + * // might throw an exception when testing + * catch (IOEXception) { + * LOG.info("Exception") + * } + * ... + * } + * ... + * } + * + * + * Each unit test should use a unique event type. + * The types can be defined by adding them to + * InjectionEvent class. + * + * methods: + * + * // simulate actions + * void processEvent() + * // simulate exceptions + * void processEventIO() throws IOException + * + * // simulate conditions + * boolean trueCondition() + * boolean falseCondition() + * + * The class implementing InjectionHandler must + * override respective protected methods + * _processEvent() + * _processEventIO() + * _trueCondition() + * _falseCondition() + */ +public class InjectionHandler { + + private static final Log LOG = LogFactory.getLog(InjectionHandler.class); + + // the only handler to which everyone reports + private static InjectionHandler handler = new InjectionHandler(); + + // can not be instantiated outside, unless a testcase extends it + protected InjectionHandler() {} + + // METHODS FOR PRODUCTION CODE + + protected void _processEvent(InjectionEvent event, Object... args) { + // by default do nothing + } + + protected void _processEventIO(InjectionEvent event, Object... args) throws IOException{ + // by default do nothing + } + + protected boolean _trueCondition(InjectionEvent event, Object... args) { + return true; // neutral in conjunction + } + + protected boolean _falseCondition(InjectionEvent event, Object... args) { + return false; // neutral in alternative + } + + //////////////////////////////////////////////////////////// + + /** + * Set to the empty/production implementation. + */ + public static void clear() { + handler = new InjectionHandler(); + } + + /** + * Set custom implementation of the handler. + */ + public static void set(InjectionHandler custom) { + LOG.warn("WARNING: SETTING INJECTION HANDLER" + + " - THIS SHOULD NOT BE USED IN PRODUCTION !!!"); + handler = custom; + } + + /* + * Static methods for reporting to the handler + */ + + public static void processEvent(InjectionEvent event, Object... args) { + handler._processEvent(event, args); + } + + public static void processEventIO(InjectionEvent event, Object... args) + throws IOException { + handler._processEventIO(event, args); + } + + public static boolean trueCondition(InjectionEvent event, Object... args) { + return handler._trueCondition(event, args); + } + + public static boolean falseCondition(InjectionEvent event, Object... args) { + return handler._falseCondition(event, args); + } +} + Modified: hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java?rev=1542518&r1=1542517&r2=1542518&view=diff ============================================================================== --- hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java (original) +++ hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java Sat Nov 16 14:53:15 2013 @@ -27,18 +27,28 @@ import java.util.Arrays; import java.util.List; import java.util.NavigableSet; import java.util.TreeSet; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import junit.framework.TestCase; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueTestUtil; import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.StoreScanner.StoreScannerCompactionRace; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; +import org.apache.hadoop.hbase.util.InjectionEvent; +import org.apache.hadoop.hbase.util.InjectionHandler; +import org.apache.hadoop.hbase.util.Threads; import org.junit.experimental.categories.Category; // Can't be small as it plays with EnvironmentEdgeManager @@ -501,6 +511,110 @@ public class TestStoreScanner extends Te assertEquals(false, scanner.next(results)); } + private class StoreScannerCompactionRaceCondition extends InjectionHandler { + final Store store; + Boolean beforeSeek = false; + Boolean afterSeek = false; + Boolean compactionComplete = false; + final int waitTime; + boolean doneSeeking = false; + public Future f; + StoreScannerCompactionRaceCondition(Store s, int waitTime) { + this.store = s; + this.waitTime = waitTime; + } + + protected void _processEvent(InjectionEvent event, Object... args) { + if (event == InjectionEvent.STORESCANNER_COMPACTION_RACE) { + // To prevent other scanners which are not supposed to be tested from taking this code path. + if ((args instanceof Object[]) && (args.length == 1) + && (args[0] instanceof Integer)) { + StoreScannerCompactionRace sscr = StoreScannerCompactionRace.values()[(Integer)args[0]]; + switch (sscr) { + case BEFORE_SEEK : + // Inside StoreScanner ctor before seek. + synchronized (beforeSeek) { + if (!beforeSeek) { + beforeSeek = true; + f = Executors.newSingleThreadExecutor().submit(new Callable() { + @Override + public Void call() throws Exception { + StoreScanner.enableLazySeekGlobally(false); + ((HStore)store).compactRecentForTestingAssumingDefaultPolicy( + store.getStorefiles().size() / 2); + StoreScanner.enableLazySeekGlobally(true); + return null; + } + }); + Threads.sleep(waitTime); + } + } + break; + case AFTER_SEEK: + // Inside StoreScanner ctor after seek. + synchronized (afterSeek) { + if (!afterSeek) { + afterSeek = true; + this.doneSeeking = true; + } + } + break; + case COMPACT_COMPLETE: + // Inside HStore.completeCompaction + synchronized (compactionComplete) { + if (!compactionComplete) { + compactionComplete = true; + assertTrue(doneSeeking); + } + } + break; + } + } + } + } + } + + /* + * Verifies that there is no race condition between StoreScanner construction and compaction. + * This is done through 3 injection points: + * 1. before seek operation in StoreScanner ctor + * 2. after seek operation in StoreScanner ctor + * 3. after compaction completion + */ + public void testCompactionRaceCondition() throws Exception { + HBaseTestingUtility util = new HBaseTestingUtility(); + util.startMiniCluster(1); + byte[] t = Bytes.toBytes("tbl"), cf = Bytes.toBytes("cf"); + HTable table = util.createTable(t, cf); + util.loadTable(table, cf); + util.flush(); + util.loadTable(table, cf); + util.flush(); + List regions = util.getHBaseCluster().getRegions(t); + assertTrue(regions.size() == 1); + HRegion r = regions.get(0); + Store s = r.getStore(cf); + + // Setup the injection handler. + StoreScannerCompactionRaceCondition ih = + new StoreScannerCompactionRaceCondition(s, 5); + InjectionHandler.set(ih); + + // Create a StoreScanner + TreeSet set = new TreeSet(Bytes.BYTES_COMPARATOR); + set.add(cf); + Scan scanSpec = new Scan(); + scanSpec.setStartRow(Bytes.toBytes("hjfsd")); + scanSpec.setStartRow(Bytes.toBytes("zjfsd")); + KeyValueScanner scanner = s.getScanner(scanSpec, set, s.getSmallestReadPoint()); + ih.f.get(); + + // Clear injection handling and shutdown the minicluster. + InjectionHandler.clear(); + scanner.close(); + util.shutdownMiniCluster(); + } + public void testDeleteMarkerLongevity() throws Exception { try { final long now = System.currentTimeMillis();