Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8439C106DA for ; Thu, 14 Nov 2013 19:18:51 +0000 (UTC) Received: (qmail 99299 invoked by uid 500); 14 Nov 2013 19:18:51 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 99187 invoked by uid 500); 14 Nov 2013 19:18:50 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 99180 invoked by uid 99); 14 Nov 2013 19:18:50 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Nov 2013 19:18:50 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Nov 2013 19:18:46 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 420B5238889B; Thu, 14 Nov 2013 19:18:24 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1542033 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/util/ test/java/org/apache/hadoop/hbase/regionserver/ Date: Thu, 14 Nov 2013 19:18:24 -0000 To: commits@hbase.apache.org From: liyin@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20131114191824.420B5238889B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: liyin Date: Thu Nov 14 19:18:23 2013 New Revision: 1542033 URL: http://svn.apache.org/r1542033 Log: [0.89-fb] [HBASE-9967] delete store files when a flush aborts Author: pervyshev Test Plan: unit test Reviewers: aaiyer, liyintang, manukranthk, rshroff Reviewed By: aaiyer CC: hbase-eng@ Differential Revision: https://phabricator.fb.com/D1049445 Task ID: 3111537 Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1542033&r1=1542032&r2=1542033&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Thu Nov 14 19:18:23 2013 @@ -177,10 +177,10 @@ public class HRegion implements HeapSize // private byte [] name = null; protected final AtomicLong memstoreSize = new AtomicLong(0); - + // The number of rows are read protected final AtomicInteger rowReadCnt = new AtomicInteger(0); - + // The number of rows are updated protected final AtomicInteger rowUpdateCnt = new AtomicInteger(0); @@ -1576,6 +1576,9 @@ public class HRegion implements HeapSize status.abort("Flush failed: " + StringUtils.stringifyException(ioe)); // The caller can recover from this IOException. No harm done if // memstore flush fails. + for (StoreFlusher flusher : storeFlushers) { + flusher.cancel(); + } throw ioe; } @@ -1917,7 +1920,7 @@ public class HRegion implements HeapSize public void delete(Map> familyMap, boolean writeToWAL) throws IOException { long now = EnvironmentEdgeManager.currentTimeMillis(); - + byte [] byteNow = Bytes.toBytes(now); boolean flush = false; @@ -2484,9 +2487,9 @@ public class HRegion implements HeapSize long seqNum) { // Increment the rowUpdatedCnt this.rowUpdateCnt.incrementAndGet(); - + long start = EnvironmentEdgeManager.currentTimeMillis(); - + MultiVersionConsistencyControl.WriteEntry w = null; long size = 0; try { Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1542033&r1=1542032&r2=1542033&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Thu Nov 14 19:18:23 2013 @@ -722,12 +722,13 @@ public class Store extends SchemaConfigu private StoreFile flushCache(final long logCacheFlushId, SortedSet snapshot, TimeRangeTracker snapshotTimeRangeTracker, - MonitoredTask status) throws IOException { + MonitoredTask status, + CacheFlushInfo info) throws IOException { // If an exception happens flushing, we let it out without clearing // the memstore snapshot. The old snapshot will be returned when we say // 'snapshot', the next time flush comes around. return internalFlushCache(snapshot, logCacheFlushId, - snapshotTimeRangeTracker, status); + snapshotTimeRangeTracker, status, info); } /* @@ -741,7 +742,8 @@ public class Store extends SchemaConfigu private StoreFile internalFlushCache(final SortedSet snapshot, final long logCacheFlushId, TimeRangeTracker snapshotTimeRangeTracker, - MonitoredTask status) throws IOException { + MonitoredTask status, + CacheFlushInfo info) throws IOException { StoreFile.Writer writer; // Find the smallest read point across all the Scanners. long smallestReadPoint = region.getSmallestReadPoint(); @@ -770,6 +772,7 @@ public class Store extends SchemaConfigu status.setStatus("Flushing " + this + ": creating writer"); // A. Write the map out to the disk writer = createWriterInTmp(snapshot.size(), this.compression, false); + info.tmpPath = writer.getPath(); writer.setTimeRangeTracker(snapshotTimeRangeTracker); fileName = writer.getPath().getName(); try { @@ -801,6 +804,7 @@ public class Store extends SchemaConfigu writer.appendMetadata(EnvironmentEdgeManager.currentTimeMillis(), logCacheFlushId, false); status.setStatus("Flushing " + this + ": closing flushed file"); writer.close(); + InjectionHandler.processEventIO(InjectionEvent.STOREFILE_AFTER_WRITE_CLOSE, writer.getPath()); } } } finally { @@ -814,28 +818,58 @@ public class Store extends SchemaConfigu // Write-out finished successfully, move into the right spot LOG.info("Renaming flushed file at " + writer.getPath() + " to " + dstPath); fs.rename(writer.getPath(), dstPath); + InjectionHandler.processEventIO(InjectionEvent.STOREFILE_AFTER_RENAME, writer.getPath(), dstPath); + info.dstPath = dstPath; StoreFile sf = new StoreFile(this.fs, dstPath, this.conf, this.cacheConf, this.family.getBloomFilterType(), this.dataBlockEncoder); passSchemaMetricsTo(sf); StoreFile.Reader r = sf.createReader(); - this.storeSize += r.length(); + info.sequenceId = logCacheFlushId; + info.entries = r.getEntries(); + info.memSize = flushed; + info.fileSize = r.length(); + return sf; + } + + private boolean commit(StoreFile storeFile, SortedSet snapshot, CacheFlushInfo info) + throws IOException { + this.storeSize += info.fileSize; // This increments the metrics associated with total flushed bytes for this // family. The overall flush count is stored in the static metrics and // retrieved from HRegion.recentFlushes, which is set within // HRegion.internalFlushcache, which indirectly calls this to actually do // the flushing through the StoreFlusherImpl class getSchemaMetrics().updatePersistentStoreMetric( - SchemaMetrics.StoreMetricType.FLUSH_SIZE, flushed); + SchemaMetrics.StoreMetricType.FLUSH_SIZE, info.memSize); if (LOG.isInfoEnabled()) { - LOG.info("Added " + sf + ", entries=" + r.getEntries() + - ", sequenceid=" + logCacheFlushId + - ", memsize=" + StringUtils.humanReadableInt(flushed) + - ", filesize=" + StringUtils.humanReadableInt(r.length()) + + LOG.info("Added " + storeFile + ", entries=" + info.entries + + ", sequenceid=" + info.sequenceId + + ", memsize=" + StringUtils.humanReadableInt(info.memSize) + + ", filesize=" + StringUtils.humanReadableInt(info.fileSize) + " to " + this.region.regionInfo.getRegionNameAsString()); } - return sf; + // Add new file to store files. Clear snapshot too while we have + // the Store write lock. + return updateStorefiles(storeFile, snapshot); + } + + private void cancel(CacheFlushInfo info) { + if (info.tmpPath != null) { + try { + fs.delete(info.tmpPath, false); + } catch (IOException e) { + // that's ok + } + } + if (info.dstPath != null) { + try { + fs.delete(info.dstPath, false); + } catch (IOException e) { + // that's ok + } + } } /* @@ -1949,6 +1983,16 @@ public class Store extends SchemaConfigu return new StoreFlusherImpl(cacheFlushId); } + private class CacheFlushInfo { + Path tmpPath; + Path dstPath; + + long entries; + long sequenceId; + long memSize; + long fileSize; + } + private class StoreFlusherImpl implements StoreFlusher { private long cacheFlushId; @@ -1956,6 +2000,8 @@ public class Store extends SchemaConfigu private StoreFile storeFile; private TimeRangeTracker snapshotTimeRangeTracker; + CacheFlushInfo cacheFlushInfo; + private StoreFlusherImpl(long cacheFlushId) { this.cacheFlushId = cacheFlushId; } @@ -1965,12 +2011,13 @@ public class Store extends SchemaConfigu memstore.snapshot(); this.snapshot = memstore.getSnapshot(); this.snapshotTimeRangeTracker = memstore.getSnapshotTimeRangeTracker(); + this.cacheFlushInfo = new CacheFlushInfo(); } @Override public void flushCache(MonitoredTask status) throws IOException { storeFile = Store.this.flushCache(cacheFlushId, snapshot, - snapshotTimeRangeTracker, status); + snapshotTimeRangeTracker, status, cacheFlushInfo); } @Override @@ -1978,9 +2025,12 @@ public class Store extends SchemaConfigu if (storeFile == null) { return false; } - // Add new file to store files. Clear snapshot too while we have - // the Store write lock. - return Store.this.updateStorefiles(storeFile, snapshot); + return Store.this.commit(storeFile, snapshot, cacheFlushInfo); + } + + @Override + public void cancel() { + Store.this.cancel(cacheFlushInfo); } } Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java?rev=1542033&r1=1542032&r2=1542033&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java Thu Nov 14 19:18:23 2013 @@ -63,4 +63,9 @@ interface StoreFlusher { */ boolean commit() throws IOException; + /** + * Cancel the flush - remove files from file system + */ + void cancel(); + } Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java?rev=1542033&r1=1542032&r2=1542033&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java Thu Nov 14 19:18:23 2013 @@ -39,5 +39,7 @@ public enum InjectionEvent { // Injection into Store.java READONLYSTORE_COMPACTION_WHILE_SNAPSHOTTING, - STORESCANNER_COMPACTION_RACE + STORESCANNER_COMPACTION_RACE, + STOREFILE_AFTER_WRITE_CLOSE, + STOREFILE_AFTER_RENAME } Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=1542033&r1=1542032&r2=1542033&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java Thu Nov 14 19:18:23 2013 @@ -23,8 +23,10 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -66,6 +68,8 @@ import org.apache.hadoop.hbase.util.Envi import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; +import org.apache.hadoop.hbase.util.InjectionEvent; +import org.apache.hadoop.hbase.util.InjectionHandler; import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; @@ -1468,7 +1472,7 @@ public class TestHRegion extends HBaseTe // Setting up region String method = this.getName(); byte[] tableName = Bytes.toBytes("testtableNextRows"); - byte[][] rows = {Bytes.toBytes("row1"), Bytes.toBytes("row2"), + byte[][] rows = {Bytes.toBytes("row1"), Bytes.toBytes("row2"), Bytes.toBytes("rows3")}; byte[][] families = { Bytes.toBytes("fam1"), Bytes.toBytes("fam2"), Bytes.toBytes("fam3"), Bytes.toBytes("fam4") }; @@ -1478,8 +1482,8 @@ public class TestHRegion extends HBaseTe List expected = new ArrayList(); fillTable(rows, families, 2, expected); /** - * in this case we know kv size = 28 - * KLEN VLEN ROWLEN ROWNAME CFLEN CFNAME TS TYPE + * in this case we know kv size = 28 + * KLEN VLEN ROWLEN ROWNAME CFLEN CFNAME TS TYPE * --4-|--4-|--2---|---4---|--1--|--4---|-8-|--1-- ===> 28 bytes */ Scan scan = new Scan(); @@ -1488,23 +1492,23 @@ public class TestHRegion extends HBaseTe scan.addFamily(families[3]); // fetch one kv even when responseSize = 0, oh well, this's the semantic - // that users should be aware of + // that users should be aware of compareNextRows(scan, 0, true, Integer.MAX_VALUE, expected.subList(0, 1)); // fetch the last kv pair if the responseSize is not big enough compareNextRows(scan, 1, true, Integer.MAX_VALUE, expected.subList(0, 1)); - // maxResponseSize perfectly fits one kv + // maxResponseSize perfectly fits one kv compareNextRows(scan, 28, true, Integer.MAX_VALUE, expected.subList(0, 1)); - // if partialRow == true, fetch as much as maxResponseSize allows + // if partialRow == true, fetch as much as maxResponseSize allows compareNextRows(scan, 29, true, Integer.MAX_VALUE, expected.subList(0, 2)); - // if partialRow == false, fetch the entire row + // if partialRow == false, fetch the entire row compareNextRows(scan, 29, false, Integer.MAX_VALUE, expected.subList(0, 6)); - + // fetch everything in the table as long as responseSize is big enough compareNextRows(scan, 10000, true, Integer.MAX_VALUE, expected); compareNextRows(scan, 10000, false, Integer.MAX_VALUE, expected); - - // check nbRows + + // check nbRows // fetch two rows, each has two columns and each column has 3 kvs compareNextRows(scan, 10000, true, 2, expected.subList(0, 12)); compareNextRows(scan, 10000, false, 2, expected.subList(0, 12)); @@ -2977,6 +2981,72 @@ public class TestHRegion extends HBaseTe assertTrue(keyValues.length == 0); } + public void testRemoveStoreFilesOnWriteFailure() + throws IOException { + byte[] table = Bytes.toBytes("table"); + byte[][] families = new byte[][] { + Bytes.toBytes("family1"), + Bytes.toBytes("family2"), + Bytes.toBytes("family3") + }; + initHRegion(table, getName(), families); + + Put put = new Put(Bytes.toBytes("row")); + put.add(families[0], null, null); + put.add(families[1], null, null); + put.add(families[2], null, null); + region.put(put); + + class InjectionHandlerImpl extends InjectionHandler { + + private Set paths = new HashSet(); + + private int writeCount = 0; + + protected void _processEventIO(InjectionEvent event, Object... args) + throws IOException { + switch (event) { + case STOREFILE_AFTER_WRITE_CLOSE: + { + paths.add((Path) args[0]); + if (++writeCount == 2) { + throw new IOException(); + } + break; + } + case STOREFILE_AFTER_RENAME: + { + paths.add((Path) args[1]); + break; + } + } + } + + public void validate() + throws IOException { + for (Path path : paths) { + assertFalse("file should not exist: " + path, region.fs.exists(path)); + } + } + + } + + InjectionHandlerImpl ih = new InjectionHandlerImpl(); + InjectionHandler.set(ih); + + try { + region.flushcache(); + fail(); + } + catch (IOException e) { + // that's expected + } + + ih.validate(); + + InjectionHandler.clear(); + } + private void putData(int startRow, int numRows, byte [] qf, byte [] ...families) throws IOException {