Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 36166200D53 for ; Mon, 20 Nov 2017 17:15:27 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 34D9A160BEC; Mon, 20 Nov 2017 16:15:27 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5C4F3160C15 for ; Mon, 20 Nov 2017 17:15:25 +0100 (CET) Received: (qmail 89929 invoked by uid 500); 20 Nov 2017 16:15:23 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 89641 invoked by uid 99); 20 Nov 2017 16:15:23 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 20 Nov 2017 16:15:23 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DB659F5F68; Mon, 20 Nov 2017 16:15:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Mon, 20 Nov 2017 16:15:31 -0000 Message-Id: <46811f2dcd7342a0acf780ad8c196c2b@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [11/17] hbase-site git commit: Published site at . archived-at: Mon, 20 Nov 2017 16:15:27 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/db363ec6/testdevapidocs/src-html/org/apache/hadoop/hbase/regionserver/TestHStore.FaultyFileSystem.html ---------------------------------------------------------------------- diff --git a/testdevapidocs/src-html/org/apache/hadoop/hbase/regionserver/TestHStore.FaultyFileSystem.html b/testdevapidocs/src-html/org/apache/hadoop/hbase/regionserver/TestHStore.FaultyFileSystem.html index dfba8d0..bf3143b 100644 --- a/testdevapidocs/src-html/org/apache/hadoop/hbase/regionserver/TestHStore.FaultyFileSystem.html +++ b/testdevapidocs/src-html/org/apache/hadoop/hbase/regionserver/TestHStore.FaultyFileSystem.html @@ -1473,379 +1473,384 @@ 1465 int flushSize = 500; 1466 Configuration conf = HBaseConfiguration.create(); 1467 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStoreWithCustomCompactor.class.getName()); -1468 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize)); -1469 // Set the lower threshold to invoke the "MERGE" policy -1470 conf.set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0)); -1471 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) -1472 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); -1473 byte[] value = Bytes.toBytes("thisisavarylargevalue"); -1474 MemStoreSizing memStoreSizing = new MemStoreSizing(); -1475 long ts = EnvironmentEdgeManager.currentTime(); -1476 long seqId = 100; -1477 // older data whihc shouldn't be "seen" by client -1478 store.add(createCell(qf1, ts, seqId, value), memStoreSizing); -1479 store.add(createCell(qf2, ts, seqId, value), memStoreSizing); -1480 store.add(createCell(qf3, ts, seqId, value), memStoreSizing); -1481 assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); -1482 StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); -1483 storeFlushCtx.prepare(); -1484 // This shouldn't invoke another in-memory flush because the first compactor thread -1485 // hasn't accomplished the in-memory compaction. -1486 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); -1487 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); +1468 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.25); +1469 MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.set(0); +1470 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize)); +1471 // Set the lower threshold to invoke the "MERGE" policy +1472 conf.set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0)); +1473 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) +1474 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); +1475 byte[] value = Bytes.toBytes("thisisavarylargevalue"); +1476 MemStoreSizing memStoreSizing = new MemStoreSizing(); +1477 long ts = EnvironmentEdgeManager.currentTime(); +1478 long seqId = 100; +1479 // older data whihc shouldn't be "seen" by client +1480 store.add(createCell(qf1, ts, seqId, value), memStoreSizing); +1481 store.add(createCell(qf2, ts, seqId, value), memStoreSizing); +1482 store.add(createCell(qf3, ts, seqId, value), memStoreSizing); +1483 assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); +1484 StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); +1485 storeFlushCtx.prepare(); +1486 // This shouldn't invoke another in-memory flush because the first compactor thread +1487 // hasn't accomplished the in-memory compaction. 1488 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); -1489 assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); -1490 //okay. Let the compaction be completed -1491 MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown(); -1492 CompactingMemStore mem = (CompactingMemStore) ((HStore)store).memstore; -1493 while (mem.isMemStoreFlushingInMemory()) { -1494 TimeUnit.SECONDS.sleep(1); -1495 } -1496 // This should invoke another in-memory flush. -1497 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); -1498 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); +1489 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); +1490 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); +1491 assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); +1492 //okay. Let the compaction be completed +1493 MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown(); +1494 CompactingMemStore mem = (CompactingMemStore) ((HStore)store).memstore; +1495 while (mem.isMemStoreFlushingInMemory()) { +1496 TimeUnit.SECONDS.sleep(1); +1497 } +1498 // This should invoke another in-memory flush. 1499 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); -1500 assertEquals(2, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); -1501 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, -1502 String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE)); -1503 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); -1504 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); -1505 } -1506 -1507 @Test -1508 public void testAge() throws IOException { -1509 long currentTime = System.currentTimeMillis(); -1510 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); -1511 edge.setValue(currentTime); -1512 EnvironmentEdgeManager.injectEdge(edge); -1513 Configuration conf = TEST_UTIL.getConfiguration(); -1514 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(family); -1515 initHRegion(name.getMethodName(), conf, -1516 TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd, null, false); -1517 HStore store = new HStore(region, hcd, conf) { -1518 -1519 @Override -1520 protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf, -1521 CellComparator kvComparator) throws IOException { -1522 List<HStoreFile> storefiles = -1523 Arrays.asList(mockStoreFile(currentTime - 10), mockStoreFile(currentTime - 100), -1524 mockStoreFile(currentTime - 1000), mockStoreFile(currentTime - 10000)); -1525 StoreFileManager sfm = mock(StoreFileManager.class); -1526 when(sfm.getStorefiles()).thenReturn(storefiles); -1527 StoreEngine<?, ?, ?, ?> storeEngine = mock(StoreEngine.class); -1528 when(storeEngine.getStoreFileManager()).thenReturn(sfm); -1529 return storeEngine; -1530 } -1531 }; -1532 assertEquals(10L, store.getMinStoreFileAge().getAsLong()); -1533 assertEquals(10000L, store.getMaxStoreFileAge().getAsLong()); -1534 assertEquals((10 + 100 + 1000 + 10000) / 4.0, store.getAvgStoreFileAge().getAsDouble(), 1E-4); -1535 } -1536 -1537 private HStoreFile mockStoreFile(long createdTime) { -1538 StoreFileInfo info = mock(StoreFileInfo.class); -1539 when(info.getCreatedTimestamp()).thenReturn(createdTime); -1540 HStoreFile sf = mock(HStoreFile.class); -1541 when(sf.getReader()).thenReturn(mock(StoreFileReader.class)); -1542 when(sf.isHFile()).thenReturn(true); -1543 when(sf.getFileInfo()).thenReturn(info); -1544 return sf; -1545 } -1546 -1547 private MyStore initMyStore(String methodName, Configuration conf, MyStoreHook hook) -1548 throws IOException { -1549 return (MyStore) init(methodName, conf, -1550 TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), -1551 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(5).build(), hook); -1552 } -1553 -1554 private class MyStore extends HStore { -1555 private final MyStoreHook hook; -1556 -1557 MyStore(final HRegion region, final ColumnFamilyDescriptor family, final Configuration -1558 confParam, MyStoreHook hook, boolean switchToPread) throws IOException { -1559 super(region, family, confParam); -1560 this.hook = hook; -1561 } -1562 -1563 @Override -1564 public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks, -1565 boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, -1566 boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, -1567 boolean includeMemstoreScanner) throws IOException { -1568 hook.getScanners(this); -1569 return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, -1570 stopRow, false, readPt, includeMemstoreScanner); -1571 } -1572 -1573 @Override -1574 public long getSmallestReadPoint() { -1575 return hook.getSmallestReadPoint(this); -1576 } -1577 } -1578 -1579 private abstract class MyStoreHook { +1500 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); +1501 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); +1502 assertEquals(2, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); +1503 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, +1504 String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE)); +1505 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); +1506 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); +1507 } +1508 +1509 @Test +1510 public void testAge() throws IOException { +1511 long currentTime = System.currentTimeMillis(); +1512 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); +1513 edge.setValue(currentTime); +1514 EnvironmentEdgeManager.injectEdge(edge); +1515 Configuration conf = TEST_UTIL.getConfiguration(); +1516 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(family); +1517 initHRegion(name.getMethodName(), conf, +1518 TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd, null, false); +1519 HStore store = new HStore(region, hcd, conf) { +1520 +1521 @Override +1522 protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf, +1523 CellComparator kvComparator) throws IOException { +1524 List<HStoreFile> storefiles = +1525 Arrays.asList(mockStoreFile(currentTime - 10), mockStoreFile(currentTime - 100), +1526 mockStoreFile(currentTime - 1000), mockStoreFile(currentTime - 10000)); +1527 StoreFileManager sfm = mock(StoreFileManager.class); +1528 when(sfm.getStorefiles()).thenReturn(storefiles); +1529 StoreEngine<?, ?, ?, ?> storeEngine = mock(StoreEngine.class); +1530 when(storeEngine.getStoreFileManager()).thenReturn(sfm); +1531 return storeEngine; +1532 } +1533 }; +1534 assertEquals(10L, store.getMinStoreFileAge().getAsLong()); +1535 assertEquals(10000L, store.getMaxStoreFileAge().getAsLong()); +1536 assertEquals((10 + 100 + 1000 + 10000) / 4.0, store.getAvgStoreFileAge().getAsDouble(), 1E-4); +1537 } +1538 +1539 private HStoreFile mockStoreFile(long createdTime) { +1540 StoreFileInfo info = mock(StoreFileInfo.class); +1541 when(info.getCreatedTimestamp()).thenReturn(createdTime); +1542 HStoreFile sf = mock(HStoreFile.class); +1543 when(sf.getReader()).thenReturn(mock(StoreFileReader.class)); +1544 when(sf.isHFile()).thenReturn(true); +1545 when(sf.getFileInfo()).thenReturn(info); +1546 return sf; +1547 } +1548 +1549 private MyStore initMyStore(String methodName, Configuration conf, MyStoreHook hook) +1550 throws IOException { +1551 return (MyStore) init(methodName, conf, +1552 TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), +1553 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(5).build(), hook); +1554 } +1555 +1556 private class MyStore extends HStore { +1557 private final MyStoreHook hook; +1558 +1559 MyStore(final HRegion region, final ColumnFamilyDescriptor family, final Configuration +1560 confParam, MyStoreHook hook, boolean switchToPread) throws IOException { +1561 super(region, family, confParam); +1562 this.hook = hook; +1563 } +1564 +1565 @Override +1566 public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks, +1567 boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, +1568 boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, +1569 boolean includeMemstoreScanner) throws IOException { +1570 hook.getScanners(this); +1571 return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, +1572 stopRow, false, readPt, includeMemstoreScanner); +1573 } +1574 +1575 @Override +1576 public long getSmallestReadPoint() { +1577 return hook.getSmallestReadPoint(this); +1578 } +1579 } 1580 -1581 void getScanners(MyStore store) throws IOException { -1582 } -1583 -1584 long getSmallestReadPoint(HStore store) { -1585 return store.getHRegion().getSmallestReadPoint(); -1586 } -1587 } -1588 -1589 @Test -1590 public void testSwitchingPreadtoStreamParallelyWithCompactionDischarger() throws Exception { -1591 Configuration conf = HBaseConfiguration.create(); -1592 conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName()); -1593 conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0); -1594 // Set the lower threshold to invoke the "MERGE" policy -1595 MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {}); -1596 MemStoreSizing memStoreSizing = new MemStoreSizing(); -1597 long ts = System.currentTimeMillis(); -1598 long seqID = 1l; -1599 // Add some data to the region and do some flushes -1600 for (int i = 1; i < 10; i++) { -1601 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), -1602 memStoreSizing); -1603 } -1604 // flush them -1605 flushStore(store, seqID); -1606 for (int i = 11; i < 20; i++) { -1607 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), -1608 memStoreSizing); -1609 } -1610 // flush them -1611 flushStore(store, seqID); -1612 for (int i = 21; i < 30; i++) { -1613 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), -1614 memStoreSizing); -1615 } -1616 // flush them -1617 flushStore(store, seqID); -1618 -1619 assertEquals(3, store.getStorefilesCount()); -1620 Scan scan = new Scan(); -1621 scan.addFamily(family); -1622 Collection<HStoreFile> storefiles2 = store.getStorefiles(); -1623 ArrayList<HStoreFile> actualStorefiles = Lists.newArrayList(storefiles2); -1624 StoreScanner storeScanner = -1625 (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE); -1626 // get the current heap -1627 KeyValueHeap heap = storeScanner.heap; -1628 // create more store files -1629 for (int i = 31; i < 40; i++) { -1630 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), -1631 memStoreSizing); -1632 } -1633 // flush them -1634 flushStore(store, seqID); -1635 -1636 for (int i = 41; i < 50; i++) { -1637 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), -1638 memStoreSizing); -1639 } -1640 // flush them -1641 flushStore(store, seqID); -1642 storefiles2 = store.getStorefiles(); -1643 ArrayList<HStoreFile> actualStorefiles1 = Lists.newArrayList(storefiles2); -1644 actualStorefiles1.removeAll(actualStorefiles); -1645 // Do compaction -1646 MyThread thread = new MyThread(storeScanner); -1647 thread.start(); -1648 store.replaceStoreFiles(actualStorefiles, actualStorefiles1); -1649 thread.join(); -1650 KeyValueHeap heap2 = thread.getHeap(); -1651 assertFalse(heap.equals(heap2)); -1652 } -1653 -1654 private static class MyThread extends Thread { -1655 private StoreScanner scanner; -1656 private KeyValueHeap heap; -1657 -1658 public MyThread(StoreScanner scanner) { -1659 this.scanner = scanner; -1660 } -1661 -1662 public KeyValueHeap getHeap() { -1663 return this.heap; -1664 } -1665 -1666 public void run() { -1667 scanner.trySwitchToStreamRead(); -1668 heap = scanner.heap; -1669 } -1670 } -1671 -1672 private static class MyMemStoreCompactor extends MemStoreCompactor { -1673 private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0); -1674 private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1); -1675 public MyMemStoreCompactor(CompactingMemStore compactingMemStore, MemoryCompactionPolicy -1676 compactionPolicy) throws IllegalArgumentIOException { -1677 super(compactingMemStore, compactionPolicy); -1678 } -1679 -1680 @Override -1681 public boolean start() throws IOException { -1682 boolean isFirst = RUNNER_COUNT.getAndIncrement() == 0; -1683 boolean rval = super.start(); -1684 if (isFirst) { -1685 try { -1686 START_COMPACTOR_LATCH.await(); -1687 } catch (InterruptedException ex) { -1688 throw new RuntimeException(ex); -1689 } -1690 } -1691 return rval; -1692 } -1693 } -1694 -1695 public static class MyCompactingMemStoreWithCustomCompactor extends CompactingMemStore { -1696 private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0); -1697 public MyCompactingMemStoreWithCustomCompactor(Configuration conf, CellComparatorImpl c, -1698 HStore store, RegionServicesForStores regionServices, -1699 MemoryCompactionPolicy compactionPolicy) throws IOException { -1700 super(conf, c, store, regionServices, compactionPolicy); -1701 } -1702 -1703 @Override -1704 protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy) -1705 throws IllegalArgumentIOException { -1706 return new MyMemStoreCompactor(this, compactionPolicy); -1707 } -1708 -1709 @Override -1710 protected boolean shouldFlushInMemory() { -1711 boolean rval = super.shouldFlushInMemory(); -1712 if (rval) { -1713 RUNNER_COUNT.incrementAndGet(); -1714 } -1715 return rval; -1716 } -1717 } -1718 -1719 public static class MyCompactingMemStore extends CompactingMemStore { -1720 private static final AtomicBoolean START_TEST = new AtomicBoolean(false); -1721 private final CountDownLatch getScannerLatch = new CountDownLatch(1); -1722 private final CountDownLatch snapshotLatch = new CountDownLatch(1); -1723 public MyCompactingMemStore(Configuration conf, CellComparatorImpl c, -1724 HStore store, RegionServicesForStores regionServices, -1725 MemoryCompactionPolicy compactionPolicy) throws IOException { -1726 super(conf, c, store, regionServices, compactionPolicy); -1727 } -1728 -1729 @Override -1730 protected List<KeyValueScanner> createList(int capacity) { -1731 if (START_TEST.get()) { -1732 try { -1733 getScannerLatch.countDown(); -1734 snapshotLatch.await(); -1735 } catch (InterruptedException e) { -1736 throw new RuntimeException(e); -1737 } -1738 } -1739 return new ArrayList<>(capacity); -1740 } -1741 @Override -1742 protected void pushActiveToPipeline(MutableSegment active) { -1743 if (START_TEST.get()) { -1744 try { -1745 getScannerLatch.await(); -1746 } catch (InterruptedException e) { -1747 throw new RuntimeException(e); -1748 } -1749 } -1750 -1751 super.pushActiveToPipeline(active); -1752 if (START_TEST.get()) { -1753 snapshotLatch.countDown(); +1581 private abstract class MyStoreHook { +1582 +1583 void getScanners(MyStore store) throws IOException { +1584 } +1585 +1586 long getSmallestReadPoint(HStore store) { +1587 return store.getHRegion().getSmallestReadPoint(); +1588 } +1589 } +1590 +1591 @Test +1592 public void testSwitchingPreadtoStreamParallelyWithCompactionDischarger() throws Exception { +1593 Configuration conf = HBaseConfiguration.create(); +1594 conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName()); +1595 conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0); +1596 // Set the lower threshold to invoke the "MERGE" policy +1597 MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {}); +1598 MemStoreSizing memStoreSizing = new MemStoreSizing(); +1599 long ts = System.currentTimeMillis(); +1600 long seqID = 1l; +1601 // Add some data to the region and do some flushes +1602 for (int i = 1; i < 10; i++) { +1603 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), +1604 memStoreSizing); +1605 } +1606 // flush them +1607 flushStore(store, seqID); +1608 for (int i = 11; i < 20; i++) { +1609 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), +1610 memStoreSizing); +1611 } +1612 // flush them +1613 flushStore(store, seqID); +1614 for (int i = 21; i < 30; i++) { +1615 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), +1616 memStoreSizing); +1617 } +1618 // flush them +1619 flushStore(store, seqID); +1620 +1621 assertEquals(3, store.getStorefilesCount()); +1622 Scan scan = new Scan(); +1623 scan.addFamily(family); +1624 Collection<HStoreFile> storefiles2 = store.getStorefiles(); +1625 ArrayList<HStoreFile> actualStorefiles = Lists.newArrayList(storefiles2); +1626 StoreScanner storeScanner = +1627 (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE); +1628 // get the current heap +1629 KeyValueHeap heap = storeScanner.heap; +1630 // create more store files +1631 for (int i = 31; i < 40; i++) { +1632 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), +1633 memStoreSizing); +1634 } +1635 // flush them +1636 flushStore(store, seqID); +1637 +1638 for (int i = 41; i < 50; i++) { +1639 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), +1640 memStoreSizing); +1641 } +1642 // flush them +1643 flushStore(store, seqID); +1644 storefiles2 = store.getStorefiles(); +1645 ArrayList<HStoreFile> actualStorefiles1 = Lists.newArrayList(storefiles2); +1646 actualStorefiles1.removeAll(actualStorefiles); +1647 // Do compaction +1648 MyThread thread = new MyThread(storeScanner); +1649 thread.start(); +1650 store.replaceStoreFiles(actualStorefiles, actualStorefiles1); +1651 thread.join(); +1652 KeyValueHeap heap2 = thread.getHeap(); +1653 assertFalse(heap.equals(heap2)); +1654 } +1655 +1656 private static class MyThread extends Thread { +1657 private StoreScanner scanner; +1658 private KeyValueHeap heap; +1659 +1660 public MyThread(StoreScanner scanner) { +1661 this.scanner = scanner; +1662 } +1663 +1664 public KeyValueHeap getHeap() { +1665 return this.heap; +1666 } +1667 +1668 public void run() { +1669 scanner.trySwitchToStreamRead(); +1670 heap = scanner.heap; +1671 } +1672 } +1673 +1674 private static class MyMemStoreCompactor extends MemStoreCompactor { +1675 private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0); +1676 private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1); +1677 public MyMemStoreCompactor(CompactingMemStore compactingMemStore, MemoryCompactionPolicy +1678 compactionPolicy) throws IllegalArgumentIOException { +1679 super(compactingMemStore, compactionPolicy); +1680 } +1681 +1682 @Override +1683 public boolean start() throws IOException { +1684 boolean isFirst = RUNNER_COUNT.getAndIncrement() == 0; +1685 boolean rval = super.start(); +1686 if (isFirst) { +1687 try { +1688 START_COMPACTOR_LATCH.await(); +1689 } catch (InterruptedException ex) { +1690 throw new RuntimeException(ex); +1691 } +1692 } +1693 return rval; +1694 } +1695 } +1696 +1697 public static class MyCompactingMemStoreWithCustomCompactor extends CompactingMemStore { +1698 private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0); +1699 public MyCompactingMemStoreWithCustomCompactor(Configuration conf, CellComparatorImpl c, +1700 HStore store, RegionServicesForStores regionServices, +1701 MemoryCompactionPolicy compactionPolicy) throws IOException { +1702 super(conf, c, store, regionServices, compactionPolicy); +1703 } +1704 +1705 @Override +1706 protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy) +1707 throws IllegalArgumentIOException { +1708 return new MyMemStoreCompactor(this, compactionPolicy); +1709 } +1710 +1711 @Override +1712 protected boolean shouldFlushInMemory() { +1713 boolean rval = super.shouldFlushInMemory(); +1714 if (rval) { +1715 RUNNER_COUNT.incrementAndGet(); +1716 if (LOG.isDebugEnabled()) { +1717 LOG.debug("runner count: " + RUNNER_COUNT.get()); +1718 } +1719 } +1720 return rval; +1721 } +1722 } +1723 +1724 public static class MyCompactingMemStore extends CompactingMemStore { +1725 private static final AtomicBoolean START_TEST = new AtomicBoolean(false); +1726 private final CountDownLatch getScannerLatch = new CountDownLatch(1); +1727 private final CountDownLatch snapshotLatch = new CountDownLatch(1); +1728 public MyCompactingMemStore(Configuration conf, CellComparatorImpl c, +1729 HStore store, RegionServicesForStores regionServices, +1730 MemoryCompactionPolicy compactionPolicy) throws IOException { +1731 super(conf, c, store, regionServices, compactionPolicy); +1732 } +1733 +1734 @Override +1735 protected List<KeyValueScanner> createList(int capacity) { +1736 if (START_TEST.get()) { +1737 try { +1738 getScannerLatch.countDown(); +1739 snapshotLatch.await(); +1740 } catch (InterruptedException e) { +1741 throw new RuntimeException(e); +1742 } +1743 } +1744 return new ArrayList<>(capacity); +1745 } +1746 @Override +1747 protected void pushActiveToPipeline(MutableSegment active) { +1748 if (START_TEST.get()) { +1749 try { +1750 getScannerLatch.await(); +1751 } catch (InterruptedException e) { +1752 throw new RuntimeException(e); +1753 } 1754 } -1755 } -1756 } -1757 -1758 interface MyListHook { -1759 void hook(int currentSize); -1760 } -1761 -1762 private static class MyList<T> implements List<T> { -1763 private final List<T> delegatee = new ArrayList<>(); -1764 private final MyListHook hookAtAdd; -1765 MyList(final MyListHook hookAtAdd) { -1766 this.hookAtAdd = hookAtAdd; -1767 } -1768 @Override -1769 public int size() {return delegatee.size();} -1770 -1771 @Override -1772 public boolean isEmpty() {return delegatee.isEmpty();} -1773 -1774 @Override -1775 public boolean contains(Object o) {return delegatee.contains(o);} -1776 -1777 @Override -1778 public Iterator<T> iterator() {return delegatee.iterator();} -1779 -1780 @Override -1781 public Object[] toArray() {return delegatee.toArray();} -1782 -1783 @Override -1784 public <R> R[] toArray(R[] a) {return delegatee.toArray(a);} -1785 -1786 @Override -1787 public boolean add(T e) { -1788 hookAtAdd.hook(size()); -1789 return delegatee.add(e); -1790 } -1791 -1792 @Override -1793 public boolean remove(Object o) {return delegatee.remove(o);} -1794 -1795 @Override -1796 public boolean containsAll(Collection<?> c) {return delegatee.containsAll(c);} -1797 -1798 @Override -1799 public boolean addAll(Collection<? extends T> c) {return delegatee.addAll(c);} -1800 -1801 @Override -1802 public boolean addAll(int index, Collection<? extends T> c) {return delegatee.addAll(index, c);} -1803 -1804 @Override -1805 public boolean removeAll(Collection<?> c) {return delegatee.removeAll(c);} -1806 -1807 @Override -1808 public boolean retainAll(Collection<?> c) {return delegatee.retainAll(c);} -1809 -1810 @Override -1811 public void clear() {delegatee.clear();} -1812 -1813 @Override -1814 public T get(int index) {return delegatee.get(index);} -1815 -1816 @Override -1817 public T set(int index, T element) {return delegatee.set(index, element);} -1818 -1819 @Override -1820 public void add(int index, T element) {delegatee.add(index, element);} -1821 -1822 @Override -1823 public T remove(int index) {return delegatee.remove(index);} -1824 -1825 @Override -1826 public int indexOf(Object o) {return delegatee.indexOf(o);} -1827 -1828 @Override -1829 public int lastIndexOf(Object o) {return delegatee.lastIndexOf(o);} -1830 -1831 @Override -1832 public ListIterator<T> listIterator() {return delegatee.listIterator();} -1833 -1834 @Override -1835 public ListIterator<T> listIterator(int index) {return delegatee.listIterator(index);} -1836 -1837 @Override -1838 public List<T> subList(int fromIndex, int toIndex) {return delegatee.subList(fromIndex, toIndex);} -1839 } -1840} +1755 +1756 super.pushActiveToPipeline(active); +1757 if (START_TEST.get()) { +1758 snapshotLatch.countDown(); +1759 } +1760 } +1761 } +1762 +1763 interface MyListHook { +1764 void hook(int currentSize); +1765 } +1766 +1767 private static class MyList<T> implements List<T> { +1768 private final List<T> delegatee = new ArrayList<>(); +1769 private final MyListHook hookAtAdd; +1770 MyList(final MyListHook hookAtAdd) { +1771 this.hookAtAdd = hookAtAdd; +1772 } +1773 @Override +1774 public int size() {return delegatee.size();} +1775 +1776 @Override +1777 public boolean isEmpty() {return delegatee.isEmpty();} +1778 +1779 @Override +1780 public boolean contains(Object o) {return delegatee.contains(o);} +1781 +1782 @Override +1783 public Iterator<T> iterator() {return delegatee.iterator();} +1784 +1785 @Override +1786 public Object[] toArray() {return delegatee.toArray();} +1787 +1788 @Override +1789 public <R> R[] toArray(R[] a) {return delegatee.toArray(a);} +1790 +1791 @Override +1792 public boolean add(T e) { +1793 hookAtAdd.hook(size()); +1794 return delegatee.add(e); +1795 } +1796 +1797 @Override +1798 public boolean remove(Object o) {return delegatee.remove(o);} +1799 +1800 @Override +1801 public boolean containsAll(Collection<?> c) {return delegatee.containsAll(c);} +1802 +1803 @Override +1804 public boolean addAll(Collection<? extends T> c) {return delegatee.addAll(c);} +1805 +1806 @Override +1807 public boolean addAll(int index, Collection<? extends T> c) {return delegatee.addAll(index, c);} +1808 +1809 @Override +1810 public boolean removeAll(Collection<?> c) {return delegatee.removeAll(c);} +1811 +1812 @Override +1813 public boolean retainAll(Collection<?> c) {return delegatee.retainAll(c);} +1814 +1815 @Override +1816 public void clear() {delegatee.clear();} +1817 +1818 @Override +1819 public T get(int index) {return delegatee.get(index);} +1820 +1821 @Override +1822 public T set(int index, T element) {return delegatee.set(index, element);} +1823 +1824 @Override +1825 public void add(int index, T element) {delegatee.add(index, element);} +1826 +1827 @Override +1828 public T remove(int index) {return delegatee.remove(index);} +1829 +1830 @Override +1831 public int indexOf(Object o) {return delegatee.indexOf(o);} +1832 +1833 @Override +1834 public int lastIndexOf(Object o) {return delegatee.lastIndexOf(o);} +1835 +1836 @Override +1837 public ListIterator<T> listIterator() {return delegatee.listIterator();} +1838 +1839 @Override +1840 public ListIterator<T> listIterator(int index) {return delegatee.listIterator(index);} +1841 +1842 @Override +1843 public List<T> subList(int fromIndex, int toIndex) {return delegatee.subList(fromIndex, toIndex);} +1844 } +1845}