Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B163818B09 for ; Tue, 22 Dec 2015 23:24:29 +0000 (UTC) Received: (qmail 76591 invoked by uid 500); 22 Dec 2015 23:24:25 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 76488 invoked by uid 500); 22 Dec 2015 23:24:25 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 75176 invoked by uid 99); 22 Dec 2015 23:24:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 Dec 2015 23:24:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8FEFEE0AFD; Tue, 22 Dec 2015 23:24:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: misty@apache.org To: commits@hbase.apache.org Date: Tue, 22 Dec 2015 23:24:41 -0000 Message-Id: <5a252407d0dc4e609a401640065dbc23@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [18/51] [partial] hbase-site git commit: Published site at 95a13b51ee052eb73882682e8f009bfa1e914866. http://git-wip-us.apache.org/repos/asf/hbase-site/blob/32d40534/devapidocs/src-html/org/apache/hadoop/hbase/wal/WALSplitter.LogRecoveredEditsOutputSink.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/wal/WALSplitter.LogRecoveredEditsOutputSink.html b/devapidocs/src-html/org/apache/hadoop/hbase/wal/WALSplitter.LogRecoveredEditsOutputSink.html index f3aed1b..0597160 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/wal/WALSplitter.LogRecoveredEditsOutputSink.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/wal/WALSplitter.LogRecoveredEditsOutputSink.html @@ -1513,810 +1513,816 @@ 1505 if (maxSeqIdInStores == null || maxSeqIdInStores.isEmpty()) { 1506 return; 1507 } -1508 List<Cell> skippedCells = new ArrayList<Cell>(); -1509 for (Cell cell : logEntry.getEdit().getCells()) { -1510 if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { -1511 byte[] family = CellUtil.cloneFamily(cell); -1512 Long maxSeqId = maxSeqIdInStores.get(family); -1513 // Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade, -1514 // or the master was crashed before and we can not get the information. -1515 if (maxSeqId != null && maxSeqId.longValue() >= logEntry.getKey().getLogSeqNum()) { -1516 skippedCells.add(cell); -1517 } -1518 } -1519 } -1520 if (!skippedCells.isEmpty()) { -1521 logEntry.getEdit().getCells().removeAll(skippedCells); -1522 } -1523 } +1508 // Create the array list for the cells that aren't filtered. +1509 // We make the assumption that most cells will be kept. +1510 ArrayList<Cell> keptCells = new ArrayList<Cell>(logEntry.getEdit().getCells().size()); +1511 for (Cell cell : logEntry.getEdit().getCells()) { +1512 if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { +1513 keptCells.add(cell); +1514 } else { +1515 byte[] family = CellUtil.cloneFamily(cell); +1516 Long maxSeqId = maxSeqIdInStores.get(family); +1517 // Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade, +1518 // or the master was crashed before and we can not get the information. +1519 if (maxSeqId == null || maxSeqId.longValue() < logEntry.getKey().getLogSeqNum()) { +1520 keptCells.add(cell); +1521 } +1522 } +1523 } 1524 -1525 @Override -1526 public void append(RegionEntryBuffer buffer) throws IOException { -1527 List<Entry> entries = buffer.entryBuffer; -1528 if (entries.isEmpty()) { -1529 LOG.warn("got an empty buffer, skipping"); -1530 return; -1531 } -1532 -1533 WriterAndPath wap = null; -1534 -1535 long startTime = System.nanoTime(); -1536 try { -1537 int editsCount = 0; +1525 // Anything in the keptCells array list is still live. +1526 // So rather than removing the cells from the array list +1527 // which would be an O(n^2) operation, we just replace the list +1528 logEntry.getEdit().setCells(keptCells); +1529 } +1530 +1531 @Override +1532 public void append(RegionEntryBuffer buffer) throws IOException { +1533 List<Entry> entries = buffer.entryBuffer; +1534 if (entries.isEmpty()) { +1535 LOG.warn("got an empty buffer, skipping"); +1536 return; +1537 } 1538 -1539 for (Entry logEntry : entries) { -1540 if (wap == null) { -1541 wap = getWriterAndPath(logEntry); -1542 if (wap == null) { -1543 if (LOG.isDebugEnabled()) { -1544 LOG.debug("getWriterAndPath decided we don't need to write edits for " + logEntry); -1545 } -1546 return; -1547 } -1548 } -1549 filterCellByStore(logEntry); -1550 if (!logEntry.getEdit().isEmpty()) { -1551 wap.w.append(logEntry); -1552 this.updateRegionMaximumEditLogSeqNum(logEntry); -1553 editsCount++; -1554 } else { -1555 wap.incrementSkippedEdits(1); -1556 } -1557 } -1558 // Pass along summary statistics -1559 wap.incrementEdits(editsCount); -1560 wap.incrementNanoTime(System.nanoTime() - startTime); -1561 } catch (IOException e) { -1562 e = e instanceof RemoteException ? -1563 ((RemoteException)e).unwrapRemoteException() : e; -1564 LOG.fatal(" Got while writing log entry to log", e); -1565 throw e; -1566 } -1567 } -1568 -1569 /** -1570 * @return a map from encoded region ID to the number of edits written out for that region. -1571 */ -1572 @Override -1573 public Map<byte[], Long> getOutputCounts() { -1574 TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR); -1575 synchronized (writers) { -1576 for (Map.Entry<byte[], SinkWriter> entry : writers.entrySet()) { -1577 ret.put(entry.getKey(), entry.getValue().editsWritten); -1578 } -1579 } -1580 return ret; -1581 } -1582 -1583 @Override -1584 public int getNumberOfRecoveredRegions() { -1585 return writers.size(); -1586 } -1587 } +1539 WriterAndPath wap = null; +1540 +1541 long startTime = System.nanoTime(); +1542 try { +1543 int editsCount = 0; +1544 +1545 for (Entry logEntry : entries) { +1546 if (wap == null) { +1547 wap = getWriterAndPath(logEntry); +1548 if (wap == null) { +1549 if (LOG.isDebugEnabled()) { +1550 LOG.debug("getWriterAndPath decided we don't need to write edits for " + logEntry); +1551 } +1552 return; +1553 } +1554 } +1555 filterCellByStore(logEntry); +1556 if (!logEntry.getEdit().isEmpty()) { +1557 wap.w.append(logEntry); +1558 this.updateRegionMaximumEditLogSeqNum(logEntry); +1559 editsCount++; +1560 } else { +1561 wap.incrementSkippedEdits(1); +1562 } +1563 } +1564 // Pass along summary statistics +1565 wap.incrementEdits(editsCount); +1566 wap.incrementNanoTime(System.nanoTime() - startTime); +1567 } catch (IOException e) { +1568 e = e instanceof RemoteException ? +1569 ((RemoteException)e).unwrapRemoteException() : e; +1570 LOG.fatal(" Got while writing log entry to log", e); +1571 throw e; +1572 } +1573 } +1574 +1575 /** +1576 * @return a map from encoded region ID to the number of edits written out for that region. +1577 */ +1578 @Override +1579 public Map<byte[], Long> getOutputCounts() { +1580 TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR); +1581 synchronized (writers) { +1582 for (Map.Entry<byte[], SinkWriter> entry : writers.entrySet()) { +1583 ret.put(entry.getKey(), entry.getValue().editsWritten); +1584 } +1585 } +1586 return ret; +1587 } 1588 -1589 /** -1590 * Class wraps the actual writer which writes data out and related statistics -1591 */ -1592 public abstract static class SinkWriter { -1593 /* Count of edits written to this path */ -1594 long editsWritten = 0; -1595 /* Count of edits skipped to this path */ -1596 long editsSkipped = 0; -1597 /* Number of nanos spent writing to this log */ -1598 long nanosSpent = 0; -1599 -1600 void incrementEdits(int edits) { -1601 editsWritten += edits; -1602 } -1603 -1604 void incrementSkippedEdits(int skipped) { -1605 editsSkipped += skipped; -1606 } -1607 -1608 void incrementNanoTime(long nanos) { -1609 nanosSpent += nanos; -1610 } -1611 } -1612 -1613 /** -1614 * Private data structure that wraps a Writer and its Path, also collecting statistics about the -1615 * data written to this output. -1616 */ -1617 private final static class WriterAndPath extends SinkWriter { -1618 final Path p; -1619 final Writer w; -1620 -1621 WriterAndPath(final Path p, final Writer w) { -1622 this.p = p; -1623 this.w = w; -1624 } -1625 } +1589 @Override +1590 public int getNumberOfRecoveredRegions() { +1591 return writers.size(); +1592 } +1593 } +1594 +1595 /** +1596 * Class wraps the actual writer which writes data out and related statistics +1597 */ +1598 public abstract static class SinkWriter { +1599 /* Count of edits written to this path */ +1600 long editsWritten = 0; +1601 /* Count of edits skipped to this path */ +1602 long editsSkipped = 0; +1603 /* Number of nanos spent writing to this log */ +1604 long nanosSpent = 0; +1605 +1606 void incrementEdits(int edits) { +1607 editsWritten += edits; +1608 } +1609 +1610 void incrementSkippedEdits(int skipped) { +1611 editsSkipped += skipped; +1612 } +1613 +1614 void incrementNanoTime(long nanos) { +1615 nanosSpent += nanos; +1616 } +1617 } +1618 +1619 /** +1620 * Private data structure that wraps a Writer and its Path, also collecting statistics about the +1621 * data written to this output. +1622 */ +1623 private final static class WriterAndPath extends SinkWriter { +1624 final Path p; +1625 final Writer w; 1626 -1627 /** -1628 * Class that manages to replay edits from WAL files directly to assigned fail over region servers -1629 */ -1630 class LogReplayOutputSink extends OutputSink { -1631 private static final double BUFFER_THRESHOLD = 0.35; -1632 private static final String KEY_DELIMITER = "#"; -1633 -1634 private long waitRegionOnlineTimeOut; -1635 private final Set<String> recoveredRegions = Collections.synchronizedSet(new HashSet<String>()); -1636 private final Map<String, RegionServerWriter> writers = -1637 new ConcurrentHashMap<String, RegionServerWriter>(); -1638 // online encoded region name -> region location map -1639 private final Map<String, HRegionLocation> onlineRegions = -1640 new ConcurrentHashMap<String, HRegionLocation>(); -1641 -1642 private Map<TableName, HConnection> tableNameToHConnectionMap = Collections -1643 .synchronizedMap(new TreeMap<TableName, HConnection>()); -1644 /** -1645 * Map key -> value layout -1646 * {@literal <servername>:<table name> -> Queue<Row>} -1647 */ -1648 private Map<String, List<Pair<HRegionLocation, Entry>>> serverToBufferQueueMap = -1649 new ConcurrentHashMap<String, List<Pair<HRegionLocation, Entry>>>(); -1650 private List<Throwable> thrown = new ArrayList<Throwable>(); -1651 -1652 // The following sink is used in distrubitedLogReplay mode for entries of regions in a disabling -1653 // table. It's a limitation of distributedLogReplay. Because log replay needs a region is -1654 // assigned and online before it can replay wal edits while regions of disabling/disabled table -1655 // won't be assigned by AM. We can retire this code after HBASE-8234. -1656 private LogRecoveredEditsOutputSink logRecoveredEditsOutputSink; -1657 private boolean hasEditsInDisablingOrDisabledTables = false; -1658 -1659 public LogReplayOutputSink(PipelineController controller, EntryBuffers entryBuffers, -1660 int numWriters) { -1661 super(controller, entryBuffers, numWriters); -1662 this.waitRegionOnlineTimeOut = -1663 conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, -1664 ZKSplitLogManagerCoordination.DEFAULT_TIMEOUT); -1665 this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(controller, -1666 entryBuffers, numWriters); -1667 this.logRecoveredEditsOutputSink.setReporter(reporter); -1668 } -1669 -1670 @Override -1671 public void append(RegionEntryBuffer buffer) throws IOException { -1672 List<Entry> entries = buffer.entryBuffer; -1673 if (entries.isEmpty()) { -1674 LOG.warn("got an empty buffer, skipping"); -1675 return; -1676 } -1677 -1678 // check if current region in a disabling or disabled table -1679 if (isTableDisabledOrDisabling(buffer.tableName)) { -1680 // need fall back to old way -1681 logRecoveredEditsOutputSink.append(buffer); -1682 hasEditsInDisablingOrDisabledTables = true; -1683 // store regions we have recovered so far -1684 addToRecoveredRegions(Bytes.toString(buffer.encodedRegionName)); -1685 return; -1686 } -1687 -1688 // group entries by region servers -1689 groupEditsByServer(entries); -1690 -1691 // process workitems -1692 String maxLocKey = null; -1693 int maxSize = 0; -1694 List<Pair<HRegionLocation, Entry>> maxQueue = null; -1695 synchronized (this.serverToBufferQueueMap) { -1696 for (Map.Entry<String, List<Pair<HRegionLocation, Entry>>> entry : -1697 this.serverToBufferQueueMap.entrySet()) { -1698 List<Pair<HRegionLocation, Entry>> curQueue = entry.getValue(); -1699 if (curQueue.size() > maxSize) { -1700 maxSize = curQueue.size(); -1701 maxQueue = curQueue; -1702 maxLocKey = entry.getKey(); -1703 } -1704 } -1705 if (maxSize < minBatchSize -1706 && entryBuffers.totalBuffered < BUFFER_THRESHOLD * entryBuffers.maxHeapUsage) { -1707 // buffer more to process -1708 return; -1709 } else if (maxSize > 0) { -1710 this.serverToBufferQueueMap.remove(maxLocKey); -1711 } -1712 } -1713 -1714 if (maxSize > 0) { -1715 processWorkItems(maxLocKey, maxQueue); -1716 } -1717 } -1718 -1719 private void addToRecoveredRegions(String encodedRegionName) { -1720 if (!recoveredRegions.contains(encodedRegionName)) { -1721 recoveredRegions.add(encodedRegionName); +1627 WriterAndPath(final Path p, final Writer w) { +1628 this.p = p; +1629 this.w = w; +1630 } +1631 } +1632 +1633 /** +1634 * Class that manages to replay edits from WAL files directly to assigned fail over region servers +1635 */ +1636 class LogReplayOutputSink extends OutputSink { +1637 private static final double BUFFER_THRESHOLD = 0.35; +1638 private static final String KEY_DELIMITER = "#"; +1639 +1640 private long waitRegionOnlineTimeOut; +1641 private final Set<String> recoveredRegions = Collections.synchronizedSet(new HashSet<String>()); +1642 private final Map<String, RegionServerWriter> writers = +1643 new ConcurrentHashMap<String, RegionServerWriter>(); +1644 // online encoded region name -> region location map +1645 private final Map<String, HRegionLocation> onlineRegions = +1646 new ConcurrentHashMap<String, HRegionLocation>(); +1647 +1648 private Map<TableName, HConnection> tableNameToHConnectionMap = Collections +1649 .synchronizedMap(new TreeMap<TableName, HConnection>()); +1650 /** +1651 * Map key -> value layout +1652 * {@literal <servername>:<table name> -> Queue<Row>} +1653 */ +1654 private Map<String, List<Pair<HRegionLocation, Entry>>> serverToBufferQueueMap = +1655 new ConcurrentHashMap<String, List<Pair<HRegionLocation, Entry>>>(); +1656 private List<Throwable> thrown = new ArrayList<Throwable>(); +1657 +1658 // The following sink is used in distrubitedLogReplay mode for entries of regions in a disabling +1659 // table. It's a limitation of distributedLogReplay. Because log replay needs a region is +1660 // assigned and online before it can replay wal edits while regions of disabling/disabled table +1661 // won't be assigned by AM. We can retire this code after HBASE-8234. +1662 private LogRecoveredEditsOutputSink logRecoveredEditsOutputSink; +1663 private boolean hasEditsInDisablingOrDisabledTables = false; +1664 +1665 public LogReplayOutputSink(PipelineController controller, EntryBuffers entryBuffers, +1666 int numWriters) { +1667 super(controller, entryBuffers, numWriters); +1668 this.waitRegionOnlineTimeOut = +1669 conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, +1670 ZKSplitLogManagerCoordination.DEFAULT_TIMEOUT); +1671 this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(controller, +1672 entryBuffers, numWriters); +1673 this.logRecoveredEditsOutputSink.setReporter(reporter); +1674 } +1675 +1676 @Override +1677 public void append(RegionEntryBuffer buffer) throws IOException { +1678 List<Entry> entries = buffer.entryBuffer; +1679 if (entries.isEmpty()) { +1680 LOG.warn("got an empty buffer, skipping"); +1681 return; +1682 } +1683 +1684 // check if current region in a disabling or disabled table +1685 if (isTableDisabledOrDisabling(buffer.tableName)) { +1686 // need fall back to old way +1687 logRecoveredEditsOutputSink.append(buffer); +1688 hasEditsInDisablingOrDisabledTables = true; +1689 // store regions we have recovered so far +1690 addToRecoveredRegions(Bytes.toString(buffer.encodedRegionName)); +1691 return; +1692 } +1693 +1694 // group entries by region servers +1695 groupEditsByServer(entries); +1696 +1697 // process workitems +1698 String maxLocKey = null; +1699 int maxSize = 0; +1700 List<Pair<HRegionLocation, Entry>> maxQueue = null; +1701 synchronized (this.serverToBufferQueueMap) { +1702 for (Map.Entry<String, List<Pair<HRegionLocation, Entry>>> entry : +1703 this.serverToBufferQueueMap.entrySet()) { +1704 List<Pair<HRegionLocation, Entry>> curQueue = entry.getValue(); +1705 if (curQueue.size() > maxSize) { +1706 maxSize = curQueue.size(); +1707 maxQueue = curQueue; +1708 maxLocKey = entry.getKey(); +1709 } +1710 } +1711 if (maxSize < minBatchSize +1712 && entryBuffers.totalBuffered < BUFFER_THRESHOLD * entryBuffers.maxHeapUsage) { +1713 // buffer more to process +1714 return; +1715 } else if (maxSize > 0) { +1716 this.serverToBufferQueueMap.remove(maxLocKey); +1717 } +1718 } +1719 +1720 if (maxSize > 0) { +1721 processWorkItems(maxLocKey, maxQueue); 1722 } 1723 } 1724 -1725 /** -1726 * Helper function to group WALEntries to individual region servers -1727 * @throws IOException -1728 */ -1729 private void groupEditsByServer(List<Entry> entries) throws IOException { -1730 Set<TableName> nonExistentTables = null; -1731 Long cachedLastFlushedSequenceId = -1l; -1732 for (Entry entry : entries) { -1733 WALEdit edit = entry.getEdit(); -1734 TableName table = entry.getKey().getTablename(); -1735 // clear scopes which isn't needed for recovery -1736 entry.getKey().setScopes(null); -1737 String encodeRegionNameStr = Bytes.toString(entry.getKey().getEncodedRegionName()); -1738 // skip edits of non-existent tables -1739 if (nonExistentTables != null && nonExistentTables.contains(table)) { -1740 this.skippedEdits.incrementAndGet(); -1741 continue; -1742 } -1743 -1744 Map<byte[], Long> maxStoreSequenceIds = null; -1745 boolean needSkip = false; -1746 HRegionLocation loc = null; -1747 String locKey = null; -1748 List<Cell> cells = edit.getCells(); -1749 List<Cell> skippedCells = new ArrayList<Cell>(); -1750 HConnection hconn = this.getConnectionByTableName(table); -1751 -1752 for (Cell cell : cells) { -1753 byte[] row = CellUtil.cloneRow(cell); -1754 byte[] family = CellUtil.cloneFamily(cell); -1755 boolean isCompactionEntry = false; -1756 if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { -1757 CompactionDescriptor compaction = WALEdit.getCompaction(cell); -1758 if (compaction != null && compaction.hasRegionName()) { -1759 try { -1760 byte[][] regionName = HRegionInfo.parseRegionName(compaction.getRegionName() -1761 .toByteArray()); -1762 row = regionName[1]; // startKey of the region -1763 family = compaction.getFamilyName().toByteArray(); -1764 isCompactionEntry = true; -1765 } catch (Exception ex) { -1766 LOG.warn("Unexpected exception received, ignoring " + ex); -1767 skippedCells.add(cell); -1768 continue; -1769 } -1770 } else { -1771 skippedCells.add(cell); -1772 continue; -1773 } -1774 } -1775 -1776 try { -1777 loc = -1778 locateRegionAndRefreshLastFlushedSequenceId(hconn, table, row, -1779 encodeRegionNameStr); -1780 // skip replaying the compaction if the region is gone -1781 if (isCompactionEntry && !encodeRegionNameStr.equalsIgnoreCase( -1782 loc.getRegionInfo().getEncodedName())) { -1783 LOG.info("Not replaying a compaction marker for an older region: " -1784 + encodeRegionNameStr); -1785 needSkip = true; -1786 } -1787 } catch (TableNotFoundException ex) { -1788 // table has been deleted so skip edits of the table -1789 LOG.info("Table " + table + " doesn't exist. Skip log replay for region " -1790 + encodeRegionNameStr); -1791 lastFlushedSequenceIds.put(encodeRegionNameStr, Long.MAX_VALUE); -1792 if (nonExistentTables == null) { -1793 nonExistentTables = new TreeSet<TableName>(); -1794 } -1795 nonExistentTables.add(table); -1796 this.skippedEdits.incrementAndGet(); -1797 needSkip = true; -1798 break; -1799 } -1800 -1801 cachedLastFlushedSequenceId = -1802 lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName()); -1803 if (cachedLastFlushedSequenceId != null -1804 && cachedLastFlushedSequenceId >= entry.getKey().getLogSeqNum()) { -1805 // skip the whole WAL entry -1806 this.skippedEdits.incrementAndGet(); -1807 needSkip = true; -1808 break; -1809 } else { -1810 if (maxStoreSequenceIds == null) { -1811 maxStoreSequenceIds = -1812 regionMaxSeqIdInStores.get(loc.getRegionInfo().getEncodedName()); -1813 } -1814 if (maxStoreSequenceIds != null) { -1815 Long maxStoreSeqId = maxStoreSequenceIds.get(family); -1816 if (maxStoreSeqId == null || maxStoreSeqId >= entry.getKey().getLogSeqNum()) { -1817 // skip current kv if column family doesn't exist anymore or already flushed -1818 skippedCells.add(cell); -1819 continue; -1820 } -1821 } -1822 } -1823 } -1824 -1825 // skip the edit -1826 if (loc == null || needSkip) continue; -1827 -1828 if (!skippedCells.isEmpty()) { -1829 cells.removeAll(skippedCells); -1830 } -1831 -1832 synchronized (serverToBufferQueueMap) { -1833 locKey = loc.getHostnamePort() + KEY_DELIMITER + table; -1834 List<Pair<HRegionLocation, Entry>> queue = serverToBufferQueueMap.get(locKey); -1835 if (queue == null) { -1836 queue = -1837 Collections.synchronizedList(new ArrayList<Pair<HRegionLocation, Entry>>()); -1838 serverToBufferQueueMap.put(locKey, queue); -1839 } -1840 queue.add(new Pair<HRegionLocation, Entry>(loc, entry)); -1841 } -1842 // store regions we have recovered so far -1843 addToRecoveredRegions(loc.getRegionInfo().getEncodedName()); -1844 } -1845 } -1846 -1847 /** -1848 * Locate destination region based on table name & row. This function also makes sure the -1849 * destination region is online for replay. -1850 * @throws IOException -1851 */ -1852 private HRegionLocation locateRegionAndRefreshLastFlushedSequenceId(HConnection hconn, -1853 TableName table, byte[] row, String originalEncodedRegionName) throws IOException { -1854 // fetch location from cache -1855 HRegionLocation loc = onlineRegions.get(originalEncodedRegionName); -1856 if(loc != null) return loc; -1857 // fetch location from hbase:meta directly without using cache to avoid hit old dead server -1858 loc = hconn.getRegionLocation(table, row, true); -1859 if (loc == null) { -1860 throw new IOException("Can't locate location for row:" + Bytes.toString(row) -1861 + " of table:" + table); -1862 } -1863 // check if current row moves to a different region due to region merge/split -1864 if (!originalEncodedRegionName.equalsIgnoreCase(loc.getRegionInfo().getEncodedName())) { -1865 // originalEncodedRegionName should have already flushed -1866 lastFlushedSequenceIds.put(originalEncodedRegionName, Long.MAX_VALUE); -1867 HRegionLocation tmpLoc = onlineRegions.get(loc.getRegionInfo().getEncodedName()); -1868 if (tmpLoc != null) return tmpLoc; -1869 } -1870 -1871 Long lastFlushedSequenceId = -1l; -1872 AtomicBoolean isRecovering = new AtomicBoolean(true); -1873 loc = waitUntilRegionOnline(loc, row, this.waitRegionOnlineTimeOut, isRecovering); -1874 if (!isRecovering.get()) { -1875 // region isn't in recovering at all because WAL file may contain a region that has -1876 // been moved to somewhere before hosting RS fails -1877 lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), Long.MAX_VALUE); -1878 LOG.info("logReplay skip region: " + loc.getRegionInfo().getEncodedName() -1879 + " because it's not in recovering."); -1880 } else { -1881 Long cachedLastFlushedSequenceId = -1882 lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName()); -1883 -1884 // retrieve last flushed sequence Id from ZK. Because region postOpenDeployTasks will -1885 // update the value for the region -1886 RegionStoreSequenceIds ids = -1887 csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName, -1888 loc.getRegionInfo().getEncodedName()); -1889 if (ids != null) { -1890 lastFlushedSequenceId = ids.getLastFlushedSequenceId(); -1891 Map<byte[], Long> storeIds = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR); -1892 List<StoreSequenceId> maxSeqIdInStores = ids.getStoreSequenceIdList(); -1893 for (StoreSequenceId id : maxSeqIdInStores) { -1894 storeIds.put(id.getFamilyName().toByteArray(), id.getSequenceId()); -1895 } -1896 regionMaxSeqIdInStores.put(loc.getRegionInfo().getEncodedName(), storeIds); -1897 } -1898 -1899 if (cachedLastFlushedSequenceId == null -1900 || lastFlushedSequenceId > cachedLastFlushedSequenceId) { -1901 lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), lastFlushedSequenceId); -1902 } -1903 } +1725 private void addToRecoveredRegions(String encodedRegionName) { +1726 if (!recoveredRegions.contains(encodedRegionName)) { +1727 recoveredRegions.add(encodedRegionName); +1728 } +1729 } +1730 +1731 /** +1732 * Helper function to group WALEntries to individual region servers +1733 * @throws IOException +1734 */ +1735 private void groupEditsByServer(List<Entry> entries) throws IOException { +1736 Set<TableName> nonExistentTables = null; +1737 Long cachedLastFlushedSequenceId = -1l; +1738 for (Entry entry : entries) { +1739 WALEdit edit = entry.getEdit(); +1740 TableName table = entry.getKey().getTablename(); +1741 // clear scopes which isn't needed for recovery +1742 entry.getKey().setScopes(null); +1743 String encodeRegionNameStr = Bytes.toString(entry.getKey().getEncodedRegionName()); +1744 // skip edits of non-existent tables +1745 if (nonExistentTables != null && nonExistentTables.contains(table)) { +1746 this.skippedEdits.incrementAndGet(); +1747 continue; +1748 } +1749 +1750 Map<byte[], Long> maxStoreSequenceIds = null; +1751 boolean needSkip = false; +1752 HRegionLocation loc = null; +1753 String locKey = null; +1754 List<Cell> cells = edit.getCells(); +1755 List<Cell> skippedCells = new ArrayList<Cell>(); +1756 HConnection hconn = this.getConnectionByTableName(table); +1757 +1758 for (Cell cell : cells) { +1759 byte[] row = CellUtil.cloneRow(cell); +1760 byte[] family = CellUtil.cloneFamily(cell); +1761 boolean isCompactionEntry = false; +1762 if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { +1763 CompactionDescriptor compaction = WALEdit.getCompaction(cell); +1764 if (compaction != null && compaction.hasRegionName()) { +1765 try { +1766 byte[][] regionName = HRegionInfo.parseRegionName(compaction.getRegionName() +1767 .toByteArray()); +1768 row = regionName[1]; // startKey of the region +1769 family = compaction.getFamilyName().toByteArray(); +1770 isCompactionEntry = true; +1771 } catch (Exception ex) { +1772 LOG.warn("Unexpected exception received, ignoring " + ex); +1773 skippedCells.add(cell); +1774 continue; +1775 } +1776 } else { +1777 skippedCells.add(cell); +1778 continue; +1779 } +1780 } +1781 +1782 try { +1783 loc = +1784 locateRegionAndRefreshLastFlushedSequenceId(hconn, table, row, +1785 encodeRegionNameStr); +1786 // skip replaying the compaction if the region is gone +1787 if (isCompactionEntry && !encodeRegionNameStr.equalsIgnoreCase( +1788 loc.getRegionInfo().getEncodedName())) { +1789 LOG.info("Not replaying a compaction marker for an older region: " +1790 + encodeRegionNameStr); +1791 needSkip = true; +1792 } +1793 } catch (TableNotFoundException ex) { +1794 // table has been deleted so skip edits of the table +1795 LOG.info("Table " + table + " doesn't exist. Skip log replay for region " +1796 + encodeRegionNameStr); +1797 lastFlushedSequenceIds.put(encodeRegionNameStr, Long.MAX_VALUE); +1798 if (nonExistentTables == null) { +1799 nonExistentTables = new TreeSet<TableName>(); +1800 } +1801 nonExistentTables.add(table); +1802 this.skippedEdits.incrementAndGet(); +1803 needSkip = true; +1804 break; +1805 } +1806 +1807 cachedLastFlushedSequenceId = +1808 lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName()); +1809 if (cachedLastFlushedSequenceId != null +1810 && cachedLastFlushedSequenceId >= entry.getKey().getLogSeqNum()) { +1811 // skip the whole WAL entry +1812 this.skippedEdits.incrementAndGet(); +1813 needSkip = true; +1814 break; +1815 } else { +1816 if (maxStoreSequenceIds == null) { +1817 maxStoreSequenceIds = +1818 regionMaxSeqIdInStores.get(loc.getRegionInfo().getEncodedName()); +1819 } +1820 if (maxStoreSequenceIds != null) { +1821 Long maxStoreSeqId = maxStoreSequenceIds.get(family); +1822 if (maxStoreSeqId == null || maxStoreSeqId >= entry.getKey().getLogSeqNum()) { +1823 // skip current kv if column family doesn't exist anymore or already flushed +1824 skippedCells.add(cell); +1825 continue; +1826 } +1827 } +1828 } +1829 } +1830 +1831 // skip the edit +1832 if (loc == null || needSkip) continue; +1833 +1834 if (!skippedCells.isEmpty()) { +1835 cells.removeAll(skippedCells); +1836 } +1837 +1838 synchronized (serverToBufferQueueMap) { +1839 locKey = loc.getHostnamePort() + KEY_DELIMITER + table; +1840 List<Pair<HRegionLocation, Entry>> queue = serverToBufferQueueMap.get(locKey); +1841 if (queue == null) { +1842 queue = +1843 Collections.synchronizedList(new ArrayList<Pair<HRegionLocation, Entry>>()); +1844 serverToBufferQueueMap.put(locKey, queue); +1845 } +1846 queue.add(new Pair<HRegionLocation, Entry>(loc, entry)); +1847 } +1848 // store regions we have recovered so far +1849 addToRecoveredRegions(loc.getRegionInfo().getEncodedName()); +1850 } +1851 } +1852 +1853 /** +1854 * Locate destination region based on table name & row. This function also makes sure the +1855 * destination region is online for replay. +1856 * @throws IOException +1857 */ +1858 private HRegionLocation locateRegionAndRefreshLastFlushedSequenceId(HConnection hconn, +1859 TableName table, byte[] row, String originalEncodedRegionName) throws IOException { +1860 // fetch location from cache +1861 HRegionLocation loc = onlineRegions.get(originalEncodedRegionName); +1862 if(loc != null) return loc; +1863 // fetch location from hbase:meta directly without using cache to avoid hit old dead server +1864 loc = hconn.getRegionLocation(table, row, true); +1865 if (loc == null) { +1866 throw new IOException("Can't locate location for row:" + Bytes.toString(row) +1867 + " of table:" + table); +1868 } +1869 // check if current row moves to a different region due to region merge/split +1870 if (!originalEncodedRegionName.equalsIgnoreCase(loc.getRegionInfo().getEncodedName())) { +1871 // originalEncodedRegionName should have already flushed +1872 lastFlushedSequenceIds.put(originalEncodedRegionName, Long.MAX_VALUE); +1873 HRegionLocation tmpLoc = onlineRegions.get(loc.getRegionInfo().getEncodedName()); +1874 if (tmpLoc != null) return tmpLoc; +1875 } +1876 +1877 Long lastFlushedSequenceId = -1l; +1878 AtomicBoolean isRecovering = new AtomicBoolean(true); +1879 loc = waitUntilRegionOnline(loc, row, this.waitRegionOnlineTimeOut, isRecovering); +1880 if (!isRecovering.get()) { +1881 // region isn't in recovering at all because WAL file may contain a region that has +1882 // been moved to somewhere before hosting RS fails +1883 lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), Long.MAX_VALUE); +1884 LOG.info("logReplay skip region: " + loc.getRegionInfo().getEncodedName() +1885 + " because it's not in recovering."); +1886 } else { +1887 Long cachedLastFlushedSequenceId = +1888 lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName()); +1889 +1890 // retrieve last flushed sequence Id from ZK. Because region postOpenDeployTasks will +1891 // update the value for the region +1892 RegionStoreSequenceIds ids = +1893 csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName, +1894 loc.getRegionInfo().getEncodedName()); +1895 if (ids != null) { +1896 lastFlushedSequenceId = ids.getLastFlushedSequenceId(); +1897 Map<byte[], Long> storeIds = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR); +1898 List<StoreSequenceId> maxSeqIdInStores = ids.getStoreSequenceIdList(); +1899 for (StoreSequenceId id : maxSeqIdInStores) { +1900 storeIds.put(id.getFamilyName().toByteArray(), id.getSequenceId()); +1901 } +1902 regionMaxSeqIdInStores.put(loc.getRegionInfo().getEncodedName(), storeIds); +1903 } 1904 -1905 onlineRegions.put(loc.getRegionInfo().getEncodedName(), loc); -1906 return loc; -1907 } -1908 -1909 private void processWorkItems(String key, List<Pair<HRegionLocation, Entry>> actions) -1910 throws IOException { -1911 RegionServerWriter rsw = null; -1912 -1913 long startTime = System.nanoTime(); -1914 try { -1915 rsw = getRegionServerWriter(key); -1916 rsw.sink.replayEntries(actions); -1917 -1918 // Pass along summary statistics -1919 rsw.incrementEdits(actions.size()); -1920 rsw.incrementNanoTime(System.nanoTime() - startTime); -1921 } catch (IOException e) { -1922 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; -1923 LOG.fatal(" Got while writing log entry to log", e); -1924 throw e; -1925 } -1926 } -1927 -1928 /** -1929 * Wait until region is online on the destination region server -1930 * @param loc -1931 * @param row -1932 * @param timeout How long to wait -1933 * @param isRecovering Recovering state of the region interested on destination region server. -1934 * @return True when region is online on the destination region server -1935 * @throws InterruptedException -1936 */ -1937 private HRegionLocation waitUntilRegionOnline(HRegionLocation loc, byte[] row, -1938 final long timeout, AtomicBoolean isRecovering) -1939 throws IOException { -1940 final long endTime = EnvironmentEdgeManager.currentTime() + timeout; -1941 final long pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, -1942 HConstants.DEFAULT_HBASE_CLIENT_PAUSE); -1943 boolean reloadLocation = false; -1944 TableName tableName = loc.getRegionInfo().getTable(); -1945 int tries = 0; -1946 Throwable cause = null; -1947 while (endTime > EnvironmentEdgeManager.currentTime()) { -1948 try { -1949 // Try and get regioninfo from the hosting server. -1950 HConnection hconn = getConnectionByTableName(tableName); -1951 if(reloadLocation) { -1952 loc = hconn.getRegionLocation(tableName, row, true); -1953 } -1954 BlockingInterface remoteSvr = hconn.getAdmin(loc.getServerName()); -1955 HRegionInfo region = loc.getRegionInfo(); -1956 try { -1957 GetRegionInfoRequest request = -1958 RequestConverter.buildGetRegionInfoRequest(region.getRegionName()); -1959 GetRegionInfoResponse response = remoteSvr.getRegionInfo(null, request); -1960 if (HRegionInfo.convert(response.getRegionInfo()) != null) { -1961 isRecovering.set((response.hasIsRecovering()) ? response.getIsRecovering() : true); -1962 return loc; -1963 } -1964 } catch (ServiceException se) { -1965 throw ProtobufUtil.getRemoteException(se); -