Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1C897200D76 for ; Sun, 3 Dec 2017 16:18:21 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 17BE4160C22; Sun, 3 Dec 2017 15:18:21 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 91874160C20 for ; Sun, 3 Dec 2017 16:18:17 +0100 (CET) Received: (qmail 26137 invoked by uid 500); 3 Dec 2017 15:18:13 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 25594 invoked by uid 99); 3 Dec 2017 15:18:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 03 Dec 2017 15:18:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 03CEEF60EF; Sun, 3 Dec 2017 15:18:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Sun, 03 Dec 2017 15:18:35 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [25/51] [partial] hbase-site git commit: Published site at . archived-at: Sun, 03 Dec 2017 15:18:21 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/c54c242b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.MutationBatchOperation.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.MutationBatchOperation.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.MutationBatchOperation.html index 3edfbef..9707b2c 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.MutationBatchOperation.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.MutationBatchOperation.html @@ -2459,5936 +2459,5935 @@ 2451 } 2452 2453 for (HStore s : storesToFlush) { -2454 MemStoreSize flushableSize = s.getFlushableSize(); -2455 totalSizeOfFlushableStores.incMemStoreSize(flushableSize); -2456 storeFlushCtxs.put(s.getColumnFamilyDescriptor().getName(), -2457 s.createFlushContext(flushOpSeqId, tracker)); -2458 // for writing stores to WAL -2459 committedFiles.put(s.getColumnFamilyDescriptor().getName(), null); -2460 storeFlushableSize.put(s.getColumnFamilyDescriptor().getName(), flushableSize); -2461 } -2462 -2463 // write the snapshot start to WAL -2464 if (wal != null && !writestate.readOnly) { -2465 FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH, -2466 getRegionInfo(), flushOpSeqId, committedFiles); -2467 // No sync. Sync is below where no updates lock and we do FlushAction.COMMIT_FLUSH -2468 WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, false, -2469 mvcc); -2470 } -2471 -2472 // Prepare flush (take a snapshot) -2473 for (StoreFlushContext flush : storeFlushCtxs.values()) { -2474 flush.prepare(); -2475 } -2476 } catch (IOException ex) { -2477 doAbortFlushToWAL(wal, flushOpSeqId, committedFiles); -2478 throw ex; -2479 } finally { -2480 this.updatesLock.writeLock().unlock(); -2481 } -2482 String s = "Finished memstore snapshotting " + this + ", syncing WAL and waiting on mvcc, " + -2483 "flushsize=" + totalSizeOfFlushableStores; -2484 status.setStatus(s); -2485 doSyncOfUnflushedWALChanges(wal, getRegionInfo()); -2486 return new PrepareFlushResult(storeFlushCtxs, committedFiles, storeFlushableSize, startTime, -2487 flushOpSeqId, flushedSeqId, totalSizeOfFlushableStores); -2488 } -2489 -2490 /** -2491 * Utility method broken out of internalPrepareFlushCache so that method is smaller. -2492 */ -2493 private void logFatLineOnFlush(Collection<HStore> storesToFlush, long sequenceId) { -2494 if (!LOG.isInfoEnabled()) { -2495 return; -2496 } -2497 // Log a fat line detailing what is being flushed. -2498 StringBuilder perCfExtras = null; -2499 if (!isAllFamilies(storesToFlush)) { -2500 perCfExtras = new StringBuilder(); -2501 for (HStore store: storesToFlush) { -2502 perCfExtras.append("; ").append(store.getColumnFamilyName()); -2503 perCfExtras.append("=") -2504 .append(StringUtils.byteDesc(store.getFlushableSize().getDataSize())); -2505 } -2506 } -2507 LOG.info("Flushing " + + storesToFlush.size() + "/" + stores.size() + -2508 " column families, memstore=" + StringUtils.byteDesc(this.memstoreDataSize.get()) + -2509 ((perCfExtras != null && perCfExtras.length() > 0)? perCfExtras.toString(): "") + -2510 ((wal != null) ? "" : "; WAL is null, using passed sequenceid=" + sequenceId)); -2511 } -2512 -2513 private void doAbortFlushToWAL(final WAL wal, final long flushOpSeqId, -2514 final Map<byte[], List<Path>> committedFiles) { -2515 if (wal == null) return; -2516 try { -2517 FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH, -2518 getRegionInfo(), flushOpSeqId, committedFiles); -2519 WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, false, -2520 mvcc); -2521 } catch (Throwable t) { -2522 LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" + -2523 StringUtils.stringifyException(t)); -2524 // ignore this since we will be aborting the RS with DSE. -2525 } -2526 // we have called wal.startCacheFlush(), now we have to abort it -2527 wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes()); -2528 } -2529 -2530 /** -2531 * Sync unflushed WAL changes. See HBASE-8208 for details -2532 */ -2533 private static void doSyncOfUnflushedWALChanges(final WAL wal, final RegionInfo hri) -2534 throws IOException { -2535 if (wal == null) { -2536 return; -2537 } -2538 try { -2539 wal.sync(); // ensure that flush marker is sync'ed -2540 } catch (IOException ioe) { -2541 wal.abortCacheFlush(hri.getEncodedNameAsBytes()); -2542 throw ioe; -2543 } -2544 } -2545 -2546 /** -2547 * @return True if passed Set is all families in the region. -2548 */ -2549 private boolean isAllFamilies(Collection<HStore> families) { -2550 return families == null || this.stores.size() == families.size(); -2551 } -2552 -2553 /** -2554 * Writes a marker to WAL indicating a flush is requested but cannot be complete due to various -2555 * reasons. Ignores exceptions from WAL. Returns whether the write succeeded. -2556 * @param wal -2557 * @return whether WAL write was successful -2558 */ -2559 private boolean writeFlushRequestMarkerToWAL(WAL wal, boolean writeFlushWalMarker) { -2560 if (writeFlushWalMarker && wal != null && !writestate.readOnly) { -2561 FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.CANNOT_FLUSH, -2562 getRegionInfo(), -1, new TreeMap<>(Bytes.BYTES_COMPARATOR)); -2563 try { -2564 WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, true, -2565 mvcc); -2566 return true; -2567 } catch (IOException e) { -2568 LOG.warn(getRegionInfo().getEncodedName() + " : " -2569 + "Received exception while trying to write the flush request to wal", e); -2570 } -2571 } -2572 return false; -2573 } -2574 -2575 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY", -2576 justification="Intentional; notify is about completed flush") -2577 protected FlushResultImpl internalFlushCacheAndCommit(WAL wal, MonitoredTask status, -2578 PrepareFlushResult prepareResult, Collection<HStore> storesToFlush) throws IOException { -2579 // prepare flush context is carried via PrepareFlushResult -2580 TreeMap<byte[], StoreFlushContext> storeFlushCtxs = prepareResult.storeFlushCtxs; -2581 TreeMap<byte[], List<Path>> committedFiles = prepareResult.committedFiles; -2582 long startTime = prepareResult.startTime; -2583 long flushOpSeqId = prepareResult.flushOpSeqId; -2584 long flushedSeqId = prepareResult.flushedSeqId; -2585 -2586 String s = "Flushing stores of " + this; -2587 status.setStatus(s); -2588 if (LOG.isTraceEnabled()) LOG.trace(s); -2589 -2590 // Any failure from here on out will be catastrophic requiring server -2591 // restart so wal content can be replayed and put back into the memstore. -2592 // Otherwise, the snapshot content while backed up in the wal, it will not -2593 // be part of the current running servers state. -2594 boolean compactionRequested = false; -2595 long flushedOutputFileSize = 0; -2596 try { -2597 // A. Flush memstore to all the HStores. -2598 // Keep running vector of all store files that includes both old and the -2599 // just-made new flush store file. The new flushed file is still in the -2600 // tmp directory. -2601 -2602 for (StoreFlushContext flush : storeFlushCtxs.values()) { -2603 flush.flushCache(status); -2604 } -2605 -2606 // Switch snapshot (in memstore) -> new hfile (thus causing -2607 // all the store scanners to reset/reseek). -2608 Iterator<HStore> it = storesToFlush.iterator(); -2609 // stores.values() and storeFlushCtxs have same order -2610 for (StoreFlushContext flush : storeFlushCtxs.values()) { -2611 boolean needsCompaction = flush.commit(status); -2612 if (needsCompaction) { -2613 compactionRequested = true; -2614 } -2615 byte[] storeName = it.next().getColumnFamilyDescriptor().getName(); -2616 List<Path> storeCommittedFiles = flush.getCommittedFiles(); -2617 committedFiles.put(storeName, storeCommittedFiles); -2618 // Flush committed no files, indicating flush is empty or flush was canceled -2619 if (storeCommittedFiles == null || storeCommittedFiles.isEmpty()) { -2620 MemStoreSize storeFlushableSize = prepareResult.storeFlushableSize.get(storeName); -2621 prepareResult.totalFlushableSize.decMemStoreSize(storeFlushableSize); -2622 } -2623 flushedOutputFileSize += flush.getOutputFileSize(); -2624 } -2625 storeFlushCtxs.clear(); -2626 -2627 // Set down the memstore size by amount of flush. -2628 this.decrMemStoreSize(prepareResult.totalFlushableSize); -2629 -2630 if (wal != null) { -2631 // write flush marker to WAL. If fail, we should throw DroppedSnapshotException -2632 FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH, -2633 getRegionInfo(), flushOpSeqId, committedFiles); -2634 WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, true, -2635 mvcc); -2636 } -2637 } catch (Throwable t) { -2638 // An exception here means that the snapshot was not persisted. -2639 // The wal needs to be replayed so its content is restored to memstore. -2640 // Currently, only a server restart will do this. -2641 // We used to only catch IOEs but its possible that we'd get other -2642 // exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch -2643 // all and sundry. -2644 if (wal != null) { -2645 try { -2646 FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH, -2647 getRegionInfo(), flushOpSeqId, committedFiles); -2648 WALUtil.writeFlushMarker(wal, this.replicationScope, getRegionInfo(), desc, false, mvcc); -2649 } catch (Throwable ex) { -2650 LOG.warn(getRegionInfo().getEncodedName() + " : " -2651 + "failed writing ABORT_FLUSH marker to WAL", ex); -2652 // ignore this since we will be aborting the RS with DSE. -2653 } -2654 wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes()); -2655 } -2656 DroppedSnapshotException dse = new DroppedSnapshotException("region: " + -2657 Bytes.toStringBinary(getRegionInfo().getRegionName())); -2658 dse.initCause(t); -2659 status.abort("Flush failed: " + StringUtils.stringifyException(t)); -2660 -2661 // Callers for flushcache() should catch DroppedSnapshotException and abort the region server. -2662 // However, since we may have the region read lock, we cannot call close(true) here since -2663 // we cannot promote to a write lock. Instead we are setting closing so that all other region -2664 // operations except for close will be rejected. -2665 this.closing.set(true); -2666 -2667 if (rsServices != null) { -2668 // This is a safeguard against the case where the caller fails to explicitly handle aborting -2669 rsServices.abort("Replay of WAL required. Forcing server shutdown", dse); -2670 } -2671 -2672 throw dse; -2673 } -2674 -2675 // If we get to here, the HStores have been written. -2676 if (wal != null) { -2677 wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes()); -2678 } -2679 -2680 // Record latest flush time -2681 for (HStore store: storesToFlush) { -2682 this.lastStoreFlushTimeMap.put(store, startTime); -2683 } -2684 -2685 this.maxFlushedSeqId = flushedSeqId; -2686 this.lastFlushOpSeqId = flushOpSeqId; -2687 -2688 // C. Finally notify anyone waiting on memstore to clear: -2689 // e.g. checkResources(). -2690 synchronized (this) { -2691 notifyAll(); // FindBugs NN_NAKED_NOTIFY -2692 } -2693 -2694 long time = EnvironmentEdgeManager.currentTime() - startTime; -2695 long memstoresize = this.memstoreDataSize.get(); -2696 String msg = "Finished memstore flush of ~" -2697 + StringUtils.byteDesc(prepareResult.totalFlushableSize.getDataSize()) + "/" -2698 + prepareResult.totalFlushableSize.getDataSize() + ", currentsize=" -2699 + StringUtils.byteDesc(memstoresize) + "/" + memstoresize -2700 + " for region " + this + " in " + time + "ms, sequenceid=" -2701 + flushOpSeqId + ", compaction requested=" + compactionRequested -2702 + ((wal == null) ? "; wal=null" : ""); -2703 LOG.info(msg); -2704 status.setStatus(msg); -2705 -2706 if (rsServices != null && rsServices.getMetrics() != null) { -2707 rsServices.getMetrics().updateFlush(time - startTime, -2708 prepareResult.totalFlushableSize.getDataSize(), flushedOutputFileSize); -2709 } -2710 -2711 return new FlushResultImpl(compactionRequested ? -2712 FlushResult.Result.FLUSHED_COMPACTION_NEEDED : -2713 FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, flushOpSeqId); -2714 } -2715 -2716 /** -2717 * Method to safely get the next sequence number. -2718 * @return Next sequence number unassociated with any actual edit. -2719 * @throws IOException -2720 */ -2721 @VisibleForTesting -2722 protected long getNextSequenceId(final WAL wal) throws IOException { -2723 WriteEntry we = mvcc.begin(); -2724 mvcc.completeAndWait(we); -2725 return we.getWriteNumber(); -2726 } -2727 -2728 ////////////////////////////////////////////////////////////////////////////// -2729 // get() methods for client use. -2730 ////////////////////////////////////////////////////////////////////////////// -2731 -2732 @Override -2733 public RegionScannerImpl getScanner(Scan scan) throws IOException { -2734 return getScanner(scan, null); -2735 } -2736 -2737 @Override -2738 public RegionScannerImpl getScanner(Scan scan, List<KeyValueScanner> additionalScanners) -2739 throws IOException { -2740 return getScanner(scan, additionalScanners, HConstants.NO_NONCE, HConstants.NO_NONCE); -2741 } -2742 -2743 private RegionScannerImpl getScanner(Scan scan, List<KeyValueScanner> additionalScanners, -2744 long nonceGroup, long nonce) throws IOException { -2745 startRegionOperation(Operation.SCAN); -2746 try { -2747 // Verify families are all valid -2748 if (!scan.hasFamilies()) { -2749 // Adding all families to scanner -2750 for (byte[] family : this.htableDescriptor.getColumnFamilyNames()) { -2751 scan.addFamily(family); -2752 } -2753 } else { -2754 for (byte[] family : scan.getFamilyMap().keySet()) { -2755 checkFamily(family); -2756 } -2757 } -2758 return instantiateRegionScanner(scan, additionalScanners, nonceGroup, nonce); -2759 } finally { -2760 closeRegionOperation(Operation.SCAN); -2761 } -2762 } -2763 -2764 protected RegionScanner instantiateRegionScanner(Scan scan, -2765 List<KeyValueScanner> additionalScanners) throws IOException { -2766 return instantiateRegionScanner(scan, additionalScanners, HConstants.NO_NONCE, -2767 HConstants.NO_NONCE); -2768 } -2769 -2770 protected RegionScannerImpl instantiateRegionScanner(Scan scan, -2771 List<KeyValueScanner> additionalScanners, long nonceGroup, long nonce) throws IOException { -2772 if (scan.isReversed()) { -2773 if (scan.getFilter() != null) { -2774 scan.getFilter().setReversed(true); -2775 } -2776 return new ReversedRegionScannerImpl(scan, additionalScanners, this); -2777 } -2778 return new RegionScannerImpl(scan, additionalScanners, this, nonceGroup, nonce); -2779 } -2780 -2781 /** -2782 * Prepare a delete for a row mutation processor -2783 * @param delete The passed delete is modified by this method. WARNING! -2784 * @throws IOException -2785 */ -2786 public void prepareDelete(Delete delete) throws IOException { -2787 // Check to see if this is a deleteRow insert -2788 if(delete.getFamilyCellMap().isEmpty()){ -2789 for(byte [] family : this.htableDescriptor.getColumnFamilyNames()){ -2790 // Don't eat the timestamp -2791 delete.addFamily(family, delete.getTimeStamp()); -2792 } -2793 } else { -2794 for(byte [] family : delete.getFamilyCellMap().keySet()) { -2795 if(family == null) { -2796 throw new NoSuchColumnFamilyException("Empty family is invalid"); -2797 } -2798 checkFamily(family); -2799 } -2800 } -2801 } -2802 -2803 @Override -2804 public void delete(Delete delete) throws IOException { -2805 checkReadOnly(); -2806 checkResources(); -2807 startRegionOperation(Operation.DELETE); -2808 try { -2809 // All edits for the given row (across all column families) must happen atomically. -2810 doBatchMutate(delete); -2811 } finally { -2812 closeRegionOperation(Operation.DELETE); -2813 } -2814 } -2815 -2816 /** -2817 * Row needed by below method. -2818 */ -2819 private static final byte [] FOR_UNIT_TESTS_ONLY = Bytes.toBytes("ForUnitTestsOnly"); -2820 -2821 /** -2822 * This is used only by unit tests. Not required to be a public API. -2823 * @param familyMap map of family to edits for the given family. -2824 * @throws IOException -2825 */ -2826 void delete(NavigableMap<byte[], List<Cell>> familyMap, -2827 Durability durability) throws IOException { -2828 Delete delete = new Delete(FOR_UNIT_TESTS_ONLY); -2829 delete.setFamilyCellMap(familyMap); -2830 delete.setDurability(durability); -2831 doBatchMutate(delete); -2832 } -2833 -2834 /** -2835 * Set up correct timestamps in the KVs in Delete object. -2836 * <p>Caller should have the row and region locks. -2837 * @param mutation -2838 * @param familyMap -2839 * @param byteNow -2840 * @throws IOException -2841 */ -2842 public void prepareDeleteTimestamps(Mutation mutation, Map<byte[], List<Cell>> familyMap, -2843 byte[] byteNow) throws IOException { -2844 for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) { -2845 -2846 byte[] family = e.getKey(); -2847 List<Cell> cells = e.getValue(); -2848 assert cells instanceof RandomAccess; -2849 -2850 Map<byte[], Integer> kvCount = new TreeMap<>(Bytes.BYTES_COMPARATOR); -2851 int listSize = cells.size(); -2852 for (int i=0; i < listSize; i++) { -2853 Cell cell = cells.get(i); -2854 // Check if time is LATEST, change to time of most recent addition if so -2855 // This is expensive. -2856 if (cell.getTimestamp() == HConstants.LATEST_TIMESTAMP -2857 && PrivateCellUtil.isDeleteType(cell)) { -2858 byte[] qual = CellUtil.cloneQualifier(cell); -2859 -2860 Integer count = kvCount.get(qual); -2861 if (count == null) { -2862 kvCount.put(qual, 1); -2863 } else { -2864 kvCount.put(qual, count + 1); -2865 } -2866 count = kvCount.get(qual); -2867 -2868 Get get = new Get(CellUtil.cloneRow(cell)); -2869 get.setMaxVersions(count); -2870 get.addColumn(family, qual); -2871 if (coprocessorHost != null) { -2872 if (!coprocessorHost.prePrepareTimeStampForDeleteVersion(mutation, cell, -2873 byteNow, get)) { -2874 updateDeleteLatestVersionTimeStamp(cell, get, count, byteNow); -2875 } -2876 } else { -2877 updateDeleteLatestVersionTimeStamp(cell, get, count, byteNow); -2878 } -2879 } else { -2880 PrivateCellUtil.updateLatestStamp(cell, byteNow, 0); -2881 } -2882 } -2883 } -2884 } -2885 -2886 void updateDeleteLatestVersionTimeStamp(Cell cell, Get get, int count, byte[] byteNow) -2887 throws IOException { -2888 List<Cell> result = get(get, false); -2889 -2890 if (result.size() < count) { -2891 // Nothing to delete -2892 PrivateCellUtil.updateLatestStamp(cell, byteNow, 0); -2893 return; -2894 } -2895 if (result.size() > count) { -2896 throw new RuntimeException("Unexpected size: " + result.size()); -2897 } -2898 Cell getCell = result.get(count - 1); -2899 PrivateCellUtil.setTimestamp(cell, getCell.getTimestamp()); -2900 } -2901 -2902 @Override -2903 public void put(Put put) throws IOException { -2904 checkReadOnly(); -2905 -2906 // Do a rough check that we have resources to accept a write. The check is -2907 // 'rough' in that between the resource check and the call to obtain a -2908 // read lock, resources may run out. For now, the thought is that this -2909 // will be extremely rare; we'll deal with it when it happens. -2910 checkResources(); -2911 startRegionOperation(Operation.PUT); -2912 try { -2913 // All edits for the given row (across all column families) must happen atomically. -2914 doBatchMutate(put); -2915 } finally { -2916 closeRegionOperation(Operation.PUT); -2917 } -2918 } -2919 -2920 /** -2921 * Class that tracks the progress of a batch operations, accumulating status codes and tracking -2922 * the index at which processing is proceeding. These batch operations may get split into -2923 * mini-batches for processing. -2924 */ -2925 private abstract static class BatchOperation<T> { -2926 protected final T[] operations; -2927 protected final OperationStatus[] retCodeDetails; -2928 protected final WALEdit[] walEditsFromCoprocessors; -2929 // reference family cell maps directly so coprocessors can mutate them if desired -2930 protected final Map<byte[], List<Cell>>[] familyCellMaps; -2931 -2932 protected final HRegion region; -2933 protected int nextIndexToProcess = 0; -2934 protected final ObservedExceptionsInBatch observedExceptions; -2935 //Durability of the batch (highest durability of all operations) -2936 protected Durability durability; -2937 protected boolean atomic = false; -2938 -2939 public BatchOperation(final HRegion region, T[] operations) { -2940 this.operations = operations; -2941 this.retCodeDetails = new OperationStatus[operations.length]; -2942 Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN); -2943 this.walEditsFromCoprocessors = new WALEdit[operations.length]; -2944 familyCellMaps = new Map[operations.length]; -2945 -2946 this.region = region; -2947 observedExceptions = new ObservedExceptionsInBatch(); -2948 durability = Durability.USE_DEFAULT; -2949 } -2950 -2951 /** -2952 * Visitor interface for batch operations -2953 */ -2954 @FunctionalInterface -2955 public interface Visitor { -2956 /** -2957 * @param index operation index -2958 * @return If true continue visiting remaining entries, break otherwise -2959 */ -2960 boolean visit(int index) throws IOException; -2961 } -2962 -2963 /** -2964 * Helper method for visiting pending/ all batch operations -2965 */ -2966 public void visitBatchOperations(boolean pendingOnly, int lastIndexExclusive, Visitor visitor) -2967 throws IOException { -2968 assert lastIndexExclusive <= this.size(); -2969 for (int i = nextIndexToProcess; i < lastIndexExclusive; i++) { -2970 if (!pendingOnly || isOperationPending(i)) { -2971 if (!visitor.visit(i)) { -2972 break; -2973 } -2974 } -2975 } -2976 } -2977 -2978 public abstract Mutation getMutation(int index); -2979 public abstract long getNonceGroup(int index); -2980 public abstract long getNonce(int index); -2981 /** This method is potentially expensive and useful mostly for non-replay CP path. */ -2982 public abstract Mutation[] getMutationsForCoprocs(); -2983 public abstract boolean isInReplay(); -2984 public abstract long getOrigLogSeqNum(); -2985 public abstract void startRegionOperation() throws IOException; -2986 public abstract void closeRegionOperation() throws IOException; -2987 -2988 /** -2989 * Validates each mutation and prepares a batch for write. If necessary (non-replay case), runs -2990 * CP prePut()/ preDelete() hooks for all mutations in a batch. This is intended to operate on -2991 * entire batch and will be called from outside of class to check and prepare batch. This can -2992 * be implemented by calling helper method {@link #checkAndPrepareMutation(int, long)} in a -2993 * 'for' loop over mutations. -2994 */ -2995 public abstract void checkAndPrepare() throws IOException; -2996 -2997 /** -2998 * Implement any Put request specific check and prepare logic here. Please refer to -2999 * {@link #checkAndPrepareMutation(Mutation, long)} for how its used. -3000 */ -3001 protected abstract void checkAndPreparePut(final Put p) throws IOException; -3002 -3003 /** -3004 * If necessary, calls preBatchMutate() CP hook for a mini-batch and updates metrics, cell -3005 * count, tags and timestamp for all cells of all operations in a mini-batch. -3006 */ -3007 public abstract void prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation> -3008 miniBatchOp, long timestamp, final List<RowLock> acquiredRowLocks) throws IOException; -3009 -3010 /** -3011 * Write mini-batch operations to MemStore -3012 */ -3013 public abstract WriteEntry writeMiniBatchOperationsToMemStore( -3014 final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WriteEntry writeEntry) -3015 throws IOException; -3016 -3017 protected void writeMiniBatchOperationsToMemStore( -3018 final MiniBatchOperationInProgress<Mutation> miniBatchOp, final long writeNumber) -3019 throws IOException { -3020 MemStoreSizing memStoreAccounting = new MemStoreSizing(); -3021 visitBatchOperations(true, miniBatchOp.getLastIndexExclusive(), (int index) -> { -3022 // We need to update the sequence id for following reasons. -3023 // 1) If the op is in replay mode, FSWALEntry#stampRegionSequenceId won't stamp sequence id. -3024 // 2) If no WAL, FSWALEntry won't be used -3025 // we use durability of the original mutation for the mutation passed by CP. -3026 if (isInReplay() || getMutation(index).getDurability() == Durability.SKIP_WAL) { -3027 region.updateSequenceId(familyCellMaps[index].values(), writeNumber); -3028 } -3029 applyFamilyMapToMemStore(familyCellMaps[index], memStoreAccounting); -3030 return true; -3031 }); -3032 // update memStore size -3033 region.addAndGetMemStoreSize(memStoreAccounting); -3034 } -3035 -3036 public boolean isDone() { -3037 return nextIndexToProcess == operations.length; -3038 } -3039 -3040 public int size() { -3041 return operations.length; -3042 } -3043 -3044 public boolean isOperationPending(int index) { -3045 return retCodeDetails[index].getOperationStatusCode() == OperationStatusCode.NOT_RUN; -3046 } -3047 -3048 public List<UUID> getClusterIds() { -3049 assert size() != 0; -3050 return getMutation(0).getClusterIds(); -3051 } -3052 -3053 boolean isAtomic() { -3054 return atomic; -3055 } -3056 -3057 /** -3058 * Helper method that checks and prepares only one mutation. This can be used to implement -3059 * {@link #checkAndPrepare()} for entire Batch. -3060 * NOTE: As CP prePut()/ preDelete() hooks may modify mutations, this method should be called -3061 * after prePut()/ preDelete() CP hooks are run for the mutation -3062 */ -3063 protected void checkAndPrepareMutation(Mutation mutation, final long timestamp) -3064 throws IOException { -3065 region.checkRow(mutation.getRow(), "batchMutate"); -3066 if (mutation instanceof Put) { -3067 // Check the families in the put. If bad, skip this one. -3068 checkAndPreparePut((Put) mutation); -3069 region.checkTimestamps(mutation.getFamilyCellMap(), timestamp); -3070 } else { -3071 region.prepareDelete((Delete) mutation); -3072 } -3073 } -3074 -3075 protected void checkAndPrepareMutation(int index, long timestamp) throws IOException { -3076 Mutation mutation = getMutation(index); -3077 try { -3078 this.checkAndPrepareMutation(mutation, timestamp); -3079 -3080 // store the family map reference to allow for mutations -3081 familyCellMaps[index] = mutation.getFamilyCellMap(); -3082 // store durability for the batch (highest durability of all operations in the batch) -3083 Durability tmpDur = region.getEffectiveDurability(mutation.getDurability()); -3084 if (tmpDur.ordinal() > durability.ordinal()) { -3085 durability = tmpDur; -3086 } -3087 } catch (NoSuchColumnFamilyException nscfe) { -3088 final String msg = "No such column family in batch mutation. "; -3089 if (observedExceptions.hasSeenNoSuchFamily()) { -3090 LOG.warn(msg + nscfe.getMessage()); -3091 } else { -3092 LOG.warn(msg, nscfe); -3093 observedExceptions.sawNoSuchFamily(); -3094 } -3095 retCodeDetails[index] = new OperationStatus( -3096 OperationStatusCode.BAD_FAMILY, nscfe.getMessage()); -3097 if (isAtomic()) { // fail, atomic means all or none -3098 throw nscfe; -3099 } -3100 } catch (FailedSanityCheckException fsce) { -3101 final String msg = "Batch Mutation did not pass sanity check. "; -3102 if (observedExceptions.hasSeenFailedSanityCheck()) { -3103 LOG.warn(msg + fsce.getMessage()); -3104 } else { -3105 LOG.warn(msg, fsce); -3106 observedExceptions.sawFailedSanityCheck(); -3107 } -3108 retCodeDetails[index] = new OperationStatus( -3109 OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage()); -3110 if (isAtomic()) { -3111 throw fsce; -3112 } -3113 } catch (WrongRegionException we) { -3114 final String msg = "Batch mutation had a row that does not belong to this region. "; -3115 if (observedExceptions.hasSeenWrongRegion()) { -3116 LOG.warn(msg + we.getMessage()); -3117 } else { -3118 LOG.warn(msg, we); -3119 observedExceptions.sawWrongRegion(); -3120 } -3121 retCodeDetails[index] = new OperationStatus( -3122 OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage()); -3123 if (isAtomic()) { -3124 throw we; -3125 } -3126 } -3127 } -3128 -3129 /** -3130 * Creates Mini-batch of all operations [nextIndexToProcess, lastIndexExclusive) for which -3131 * a row lock can be acquired. All mutations with locked rows are considered to be -3132 * In-progress operations and hence the name {@link MiniBatchOperationInProgress}. Mini batch -3133 * is window over {@link BatchOperation} and contains contiguous pending operations. -3134 * -3135 * @param acquiredRowLocks keeps track of rowLocks acquired. -3136 */ -3137 public MiniBatchOperationInProgress<Mutation> lockRowsAndBuildMiniBatch( -3138 List<RowLock> acquiredRowLocks) throws IOException { -3139 int readyToWriteCount = 0; -3140 int lastIndexExclusive = 0; -3141 for (; lastIndexExclusive < size(); lastIndexExclusive++) { -3142 if (!isOperationPending(lastIndexExclusive)) { -3143 continue; -3144 } -3145 Mutation mutation = getMutation(lastIndexExclusive); -3146 // If we haven't got any rows in our batch, we should block to get the next one. -3147 RowLock rowLock = null; -3148 try { -3149 // if atomic then get exclusive lock, else shared lock -3150 rowLock = region.getRowLockInternal(mutation.getRow(), !isAtomic()); -3151 } catch (TimeoutIOException e) { -3152 // We will retry when other exceptions, but we should stop if we timeout . -3153 throw e; -3154 } catch (IOException ioe) { -3155 LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe); -3156 if (isAtomic()) { // fail, atomic means all or none -3157 throw ioe; -3158 } -3159 } -3160 if (rowLock == null) { -3161 // We failed to grab another lock -3162 if (isAtomic()) { -3163 throw new IOException("Can't apply all operations atomically!"); -3164 } -3165 break; // Stop acquiring more rows for this batch -3166 } else { -3167 acquiredRowLocks.add(rowLock); -3168 } -3169 readyToWriteCount++; -3170 } -3171 return createMiniBatch(lastIndexExclusive, readyToWriteCount); -3172 } -3173 -3174 protected MiniBatchOperationInProgress<Mutation> createMiniBatch(final int lastIndexExclusive, -3175 final int readyToWriteCount) { -3176 return new MiniBatchOperationInProgress<>(getMutationsForCoprocs(), retCodeDetails, -3177 walEditsFromCoprocessors, nextIndexToProcess, lastIndexExclusive, readyToWriteCount); -3178 } -3179 -3180 /** -3181 * Builds separate WALEdit per nonce by applying input mutations. If WALEdits from CP are -3182 * present, they are merged to result WALEdit. -3183 */ -3184 public List<Pair<NonceKey, WALEdit>> buildWALEdits( -3185 final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { -3186 List<Pair<NonceKey, WALEdit>> walEdits = new ArrayList<>(); -3187 -3188 visitBatchOperations(true, nextIndexToProcess + miniBatchOp.size(), new Visitor() { -3189 private Pair<NonceKey, WALEdit> curWALEditForNonce; -3190 @Override -3191 public boolean visit(int index) throws IOException { -3192 Mutation m = getMutation(index); -3193 // we use durability of the original mutation for the mutation passed by CP. -3194 if (region.getEffectiveDurability(m.getDurability()) == Durability.SKIP_WAL) { -3195 region.recordMutationWithoutWal(m.getFamilyCellMap()); -3196 return true; -3197 } -3198 -3199 // the batch may contain multiple nonce keys (replay case). If so, write WALEdit for each. -3200 // Given how nonce keys are originally written, these should be contiguous. -3201 // They don't have to be, it will still work, just write more WALEdits than needed. -3202 long nonceGroup = getNonceGroup(index); -3203 long nonce = getNonce(index); -3204 if (curWALEditForNonce == null || -3205 curWALEditForNonce.getFirst().getNonceGroup() != nonceGroup || -3206 curWALEditForNonce.getFirst().getNonce() != nonce) { -3207 curWALEditForNonce = new Pair<>(new NonceKey(nonceGroup, nonce), -3208 new WALEdit(miniBatchOp.getCellCount(), isInReplay())); -3209 walEdits.add(curWALEditForNonce); -3210 } -3211 WALEdit walEdit = curWALEditForNonce.getSecond(); -3212 -3213 // Add WAL edits by CP -3214 WALEdit fromCP = walEditsFromCoprocessors[index]; -3215 if (fromCP != null) { -3216 for (Cell cell : fromCP.getCells()) { -3217 walEdit.add(cell); -3218 } -3219 } -3220 addFamilyMapToWALEdit(familyCellMaps[index], walEdit); -3221 -3222 return true; -3223 } -3224 }); -3225 return walEdits; -3226 } -3227 -3228 /** -3229 * This method completes mini-batch operations by calling postBatchMutate() CP hook (if -3230 * required) and completing mvcc. -3231 */ -3232 public void completeMiniBatchOperations( -3233 final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WriteEntry writeEntry) -3234 throws IOException { -3235 if (writeEntry != null) { -3236 region.mvcc.completeAndWait(writeEntry); -3237 } -3238 } -3239 -3240 public void doPostOpCleanupForMiniBatch( -3241 final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WALEdit walEdit, -3242 boolean success) throws IOException {} -3243 -3244 /** -3245 * Atomically apply the given map of family->edits to the memstore. -3246 * This handles the consistency control on its own, but the caller -3247 * should already have locked updatesLock.readLock(). This also does -3248 * <b>not</b> check the families for validity. -3249 * -3250 * @param familyMap Map of Cells by family -3251 */ -3252 protected void applyFamilyMapToMemStore(Map<byte[], List<Cell>> familyMap, -3253 MemStoreSizing memstoreAccounting) throws IOException { -3254 for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) { -3255 byte[] family = e.getKey(); -3256 List<Cell> cells = e.getValue(); -3257 assert cells instanceof RandomAccess; -3258 region.applyToMemStore(region.getStore(family), cells, false, memstoreAccounting); -3259 } -3260 } -3261 -3262 /** -3263 * Append the given map of family->edits to a WALEdit data structure. -3264 * This does not write to the WAL itself. -3265 * @param familyMap map of family->edits -3266 * @param walEdit the destination entry to append into -3267 */ -3268 private void addFamilyMapToWALEdit(Map<byte[], List<Cell>> familyMap, -3269 WALEdit walEdit) { -3270 for (List<Cell> edits : familyMap.values()) { -3271 // Optimization: 'foreach' loop is not used. See: -3272 // HBASE-12023 HRegion.applyFamilyMapToMemstore creates too many iterator objects -3273 assert edits instanceof RandomAccess; -3274 int listSize = edits.size(); -3275 for (int i=0; i < listSize; i++) { -3276 Cell cell = edits.get(i); -3277 walEdit.add(cell); -3278 } -3279 } -3280 } -3281 } -3282 -3283 /** -3284 * Batch of mutation operations. Base class is shared with {@link ReplayBatchOperation} as most -3285 * of the logic is same. -3286 */ -3287 static class MutationBatchOperation extends BatchOperation<Mutation> { -3288 private long nonceGroup; -3289 private long nonce; -3290 public MutationBatchOperation(final HRegion region, Mutation[] operations, boolean atomic, -3291 long nonceGroup, long nonce) { -3292 super(region, operations); -3293 this.atomic = atomic; -3294 this.nonceGroup = nonceGroup; -3295 this.nonce = nonce; -3296 } -3297 -3298 @Override -3299 public Mutation getMutation(int index) { -3300 return this.operations[index]; -3301 } -3302 -3303 @Override -3304 public long getNonceGroup(int index) { -3305 return nonceGroup; -3306 } -3307