Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 00AC1200D6A for ; Sat, 30 Dec 2017 16:18:57 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id F3158160C44; Sat, 30 Dec 2017 15:18:56 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id AF501160C37 for ; Sat, 30 Dec 2017 16:18:54 +0100 (CET) Received: (qmail 73743 invoked by uid 500); 30 Dec 2017 15:18:51 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 73425 invoked by uid 99); 30 Dec 2017 15:18:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 30 Dec 2017 15:18:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D5AFAF323D; Sat, 30 Dec 2017 15:18:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Sat, 30 Dec 2017 15:19:08 -0000 Message-Id: <48e5921cc27943509a2c2cfa5cb73cca@git.apache.org> In-Reply-To: <8891f102c9ed4b5c8ec80f3146f95a47@git.apache.org> References: <8891f102c9ed4b5c8ec80f3146f95a47@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [23/51] [partial] hbase-site git commit: Published site at . archived-at: Sat, 30 Dec 2017 15:18:57 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/83bf6175/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HStore.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HStore.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HStore.html index 985778f..854ba52 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HStore.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HStore.html @@ -662,1932 +662,1924 @@ 654 completeCompaction(toBeRemovedStoreFiles); 655 } 656 -657 private HStoreFile createStoreFileAndReader(final Path p) throws IOException { -658 StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(), p); -659 return createStoreFileAndReader(info); -660 } -661 -662 private HStoreFile createStoreFileAndReader(StoreFileInfo info) throws IOException { -663 info.setRegionCoprocessorHost(this.region.getCoprocessorHost()); -664 HStoreFile storeFile = new HStoreFile(this.getFileSystem(), info, this.conf, this.cacheConf, -665 this.family.getBloomFilterType(), isPrimaryReplicaStore()); -666 storeFile.initReader(); -667 return storeFile; -668 } -669 -670 /** -671 * This message intends to inform the MemStore that next coming updates -672 * are going to be part of the replaying edits from WAL -673 */ -674 public void startReplayingFromWAL(){ -675 this.memstore.startReplayingFromWAL(); -676 } -677 -678 /** -679 * This message intends to inform the MemStore that the replaying edits from WAL -680 * are done -681 */ -682 public void stopReplayingFromWAL(){ -683 this.memstore.stopReplayingFromWAL(); -684 } -685 -686 /** -687 * Adds a value to the memstore -688 */ -689 public void add(final Cell cell, MemStoreSizing memstoreSizing) { -690 lock.readLock().lock(); -691 try { -692 this.memstore.add(cell, memstoreSizing); -693 } finally { -694 lock.readLock().unlock(); -695 } -696 } -697 -698 /** -699 * Adds the specified value to the memstore -700 */ -701 public void add(final Iterable<Cell> cells, MemStoreSizing memstoreSizing) { -702 lock.readLock().lock(); -703 try { -704 memstore.add(cells, memstoreSizing); -705 } finally { -706 lock.readLock().unlock(); -707 } -708 } -709 -710 @Override -711 public long timeOfOldestEdit() { -712 return memstore.timeOfOldestEdit(); -713 } -714 -715 /** -716 * @return All store files. -717 */ -718 @Override -719 public Collection<HStoreFile> getStorefiles() { -720 return this.storeEngine.getStoreFileManager().getStorefiles(); -721 } -722 -723 @Override -724 public Collection<HStoreFile> getCompactedFiles() { -725 return this.storeEngine.getStoreFileManager().getCompactedfiles(); -726 } -727 -728 /** -729 * This throws a WrongRegionException if the HFile does not fit in this region, or an -730 * InvalidHFileException if the HFile is not valid. -731 */ -732 public void assertBulkLoadHFileOk(Path srcPath) throws IOException { -733 HFile.Reader reader = null; -734 try { -735 LOG.info("Validating hfile at " + srcPath + " for inclusion in " -736 + "store " + this + " region " + this.getRegionInfo().getRegionNameAsString()); -737 reader = HFile.createReader(srcPath.getFileSystem(conf), srcPath, cacheConf, -738 isPrimaryReplicaStore(), conf); -739 reader.loadFileInfo(); -740 -741 Optional<byte[]> firstKey = reader.getFirstRowKey(); -742 Preconditions.checkState(firstKey.isPresent(), "First key can not be null"); -743 Optional<Cell> lk = reader.getLastKey(); -744 Preconditions.checkState(lk.isPresent(), "Last key can not be null"); -745 byte[] lastKey = CellUtil.cloneRow(lk.get()); -746 -747 LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey.get()) + -748 " last=" + Bytes.toStringBinary(lastKey)); -749 LOG.debug("Region bounds: first=" + -750 Bytes.toStringBinary(getRegionInfo().getStartKey()) + -751 " last=" + Bytes.toStringBinary(getRegionInfo().getEndKey())); -752 -753 if (!this.getRegionInfo().containsRange(firstKey.get(), lastKey)) { -754 throw new WrongRegionException( -755 "Bulk load file " + srcPath.toString() + " does not fit inside region " -756 + this.getRegionInfo().getRegionNameAsString()); -757 } -758 -759 if(reader.length() > conf.getLong(HConstants.HREGION_MAX_FILESIZE, -760 HConstants.DEFAULT_MAX_FILE_SIZE)) { -761 LOG.warn("Trying to bulk load hfile " + srcPath.toString() + " with size: " + -762 reader.length() + " bytes can be problematic as it may lead to oversplitting."); -763 } -764 -765 if (verifyBulkLoads) { -766 long verificationStartTime = EnvironmentEdgeManager.currentTime(); -767 LOG.info("Full verification started for bulk load hfile: " + srcPath.toString()); -768 Cell prevCell = null; -769 HFileScanner scanner = reader.getScanner(false, false, false); -770 scanner.seekTo(); -771 do { -772 Cell cell = scanner.getCell(); -773 if (prevCell != null) { -774 if (comparator.compareRows(prevCell, cell) > 0) { -775 throw new InvalidHFileException("Previous row is greater than" -776 + " current row: path=" + srcPath + " previous=" -777 + CellUtil.getCellKeyAsString(prevCell) + " current=" -778 + CellUtil.getCellKeyAsString(cell)); -779 } -780 if (CellComparator.getInstance().compareFamilies(prevCell, cell) != 0) { -781 throw new InvalidHFileException("Previous key had different" -782 + " family compared to current key: path=" + srcPath -783 + " previous=" -784 + Bytes.toStringBinary(prevCell.getFamilyArray(), prevCell.getFamilyOffset(), -785 prevCell.getFamilyLength()) -786 + " current=" -787 + Bytes.toStringBinary(cell.getFamilyArray(), cell.getFamilyOffset(), -788 cell.getFamilyLength())); -789 } -790 } -791 prevCell = cell; -792 } while (scanner.next()); -793 LOG.info("Full verification complete for bulk load hfile: " + srcPath.toString() -794 + " took " + (EnvironmentEdgeManager.currentTime() - verificationStartTime) -795 + " ms"); -796 } -797 } finally { -798 if (reader != null) reader.close(); -799 } -800 } -801 -802 /** -803 * This method should only be called from Region. It is assumed that the ranges of values in the -804 * HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this) -805 * -806 * @param srcPathStr -807 * @param seqNum sequence Id associated with the HFile -808 */ -809 public Pair<Path, Path> preBulkLoadHFile(String srcPathStr, long seqNum) throws IOException { -810 Path srcPath = new Path(srcPathStr); -811 return fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum); -812 } -813 -814 public Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws IOException { -815 Path srcPath = new Path(srcPathStr); -816 try { -817 fs.commitStoreFile(srcPath, dstPath); -818 } finally { -819 if (this.getCoprocessorHost() != null) { -820 this.getCoprocessorHost().postCommitStoreFile(family, srcPath, dstPath); -821 } -822 } -823 -824 LOG.info("Loaded HFile " + srcPath + " into store '" + getColumnFamilyName() + "' as " -825 + dstPath + " - updating store file list."); -826 -827 HStoreFile sf = createStoreFileAndReader(dstPath); -828 bulkLoadHFile(sf); -829 -830 LOG.info("Successfully loaded store file " + srcPath + " into store " + this -831 + " (new location: " + dstPath + ")"); -832 -833 return dstPath; -834 } -835 -836 public void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException { -837 HStoreFile sf = createStoreFileAndReader(fileInfo); -838 bulkLoadHFile(sf); -839 } -840 -841 private void bulkLoadHFile(HStoreFile sf) throws IOException { -842 StoreFileReader r = sf.getReader(); -843 this.storeSize += r.length(); -844 this.totalUncompressedBytes += r.getTotalUncompressedBytes(); -845 -846 // Append the new storefile into the list -847 this.lock.writeLock().lock(); -848 try { -849 this.storeEngine.getStoreFileManager().insertNewFiles(Lists.newArrayList(sf)); -850 } finally { -851 // We need the lock, as long as we are updating the storeFiles -852 // or changing the memstore. Let us release it before calling -853 // notifyChangeReadersObservers. See HBASE-4485 for a possible -854 // deadlock scenario that could have happened if continue to hold -855 // the lock. -856 this.lock.writeLock().unlock(); -857 } -858 LOG.info("Loaded HFile " + sf.getFileInfo() + " into store '" + getColumnFamilyName()); -859 if (LOG.isTraceEnabled()) { -860 String traceMessage = "BULK LOAD time,size,store size,store files [" -861 + EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + storeSize -862 + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]"; -863 LOG.trace(traceMessage); -864 } -865 } -866 -867 /** -868 * Close all the readers We don't need to worry about subsequent requests because the Region holds -869 * a write lock that will prevent any more reads or writes. -870 * @return the {@link StoreFile StoreFiles} that were previously being used. -871 * @throws IOException on failure -872 */ -873 public ImmutableCollection<HStoreFile> close() throws IOException { -874 this.archiveLock.lock(); -875 this.lock.writeLock().lock(); -876 try { -877 // Clear so metrics doesn't find them. -878 ImmutableCollection<HStoreFile> result = storeEngine.getStoreFileManager().clearFiles(); -879 Collection<HStoreFile> compactedfiles = -880 storeEngine.getStoreFileManager().clearCompactedFiles(); -881 // clear the compacted files -882 if (compactedfiles != null && !compactedfiles.isEmpty()) { -883 removeCompactedfiles(compactedfiles); -884 } -885 if (!result.isEmpty()) { -886 // initialize the thread pool for closing store files in parallel. -887 ThreadPoolExecutor storeFileCloserThreadPool = this.region -888 .getStoreFileOpenAndCloseThreadPool("StoreFileCloserThread-" -889 + this.getColumnFamilyName()); -890 -891 // close each store file in parallel -892 CompletionService<Void> completionService = -893 new ExecutorCompletionService<>(storeFileCloserThreadPool); -894 for (HStoreFile f : result) { -895 completionService.submit(new Callable<Void>() { -896 @Override -897 public Void call() throws IOException { -898 boolean evictOnClose = -899 cacheConf != null? cacheConf.shouldEvictOnClose(): true; -900 f.closeStoreFile(evictOnClose); -901 return null; -902 } -903 }); -904 } -905 -906 IOException ioe = null; -907 try { -908 for (int i = 0; i < result.size(); i++) { -909 try { -910 Future<Void> future = completionService.take(); -911 future.get(); -912 } catch (InterruptedException e) { -913 if (ioe == null) { -914 ioe = new InterruptedIOException(); -915 ioe.initCause(e); -916 } -917 } catch (ExecutionException e) { -918 if (ioe == null) ioe = new IOException(e.getCause()); -919 } -920 } -921 } finally { -922 storeFileCloserThreadPool.shutdownNow(); -923 } -924 if (ioe != null) throw ioe; -925 } -926 LOG.info("Closed " + this); -927 return result; -928 } finally { -929 this.lock.writeLock().unlock(); -930 this.archiveLock.unlock(); -931 } -932 } -933 -934 /** -935 * Snapshot this stores memstore. Call before running -936 * {@link #flushCache(long, MemStoreSnapshot, MonitoredTask, ThroughputController, -937 * FlushLifeCycleTracker)} -938 * so it has some work to do. -939 */ -940 void snapshot() { -941 this.lock.writeLock().lock(); -942 try { -943 this.memstore.snapshot(); -944 } finally { -945 this.lock.writeLock().unlock(); -946 } -947 } -948 -949 /** -950 * Write out current snapshot. Presumes {@link #snapshot()} has been called previously. -951 * @param logCacheFlushId flush sequence number -952 * @param snapshot -953 * @param status -954 * @param throughputController -955 * @return The path name of the tmp file to which the store was flushed -956 * @throws IOException if exception occurs during process -957 */ -958 protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot, -959 MonitoredTask status, ThroughputController throughputController, -960 FlushLifeCycleTracker tracker) throws IOException { -961 // If an exception happens flushing, we let it out without clearing -962 // the memstore snapshot. The old snapshot will be returned when we say -963 // 'snapshot', the next time flush comes around. -964 // Retry after catching exception when flushing, otherwise server will abort -965 // itself -966 StoreFlusher flusher = storeEngine.getStoreFlusher(); -967 IOException lastException = null; -968 for (int i = 0; i < flushRetriesNumber; i++) { -969 try { -970 List<Path> pathNames = -971 flusher.flushSnapshot(snapshot, logCacheFlushId, status, throughputController, tracker); -972 Path lastPathName = null; -973 try { -974 for (Path pathName : pathNames) { -975 lastPathName = pathName; -976 validateStoreFile(pathName); -977 } -978 return pathNames; -979 } catch (Exception e) { -980 LOG.warn("Failed validating store file " + lastPathName + ", retrying num=" + i, e); -981 if (e instanceof IOException) { -982 lastException = (IOException) e; -983 } else { -984 lastException = new IOException(e); -985 } -986 } -987 } catch (IOException e) { -988 LOG.warn("Failed flushing store file, retrying num=" + i, e); -989 lastException = e; -990 } -991 if (lastException != null && i < (flushRetriesNumber - 1)) { -992 try { -993 Thread.sleep(pauseTime); -994 } catch (InterruptedException e) { -995 IOException iie = new InterruptedIOException(); -996 iie.initCause(e); -997 throw iie; -998 } -999 } -1000 } -1001 throw lastException; -1002 } -1003 -1004 /** -1005 * @param path The pathname of the tmp file into which the store was flushed -1006 * @param logCacheFlushId -1007 * @param status -1008 * @return store file created. -1009 * @throws IOException -1010 */ -1011 private HStoreFile commitFile(Path path, long logCacheFlushId, MonitoredTask status) -1012 throws IOException { -1013 // Write-out finished successfully, move into the right spot -1014 Path dstPath = fs.commitStoreFile(getColumnFamilyName(), path); -1015 -1016 status.setStatus("Flushing " + this + ": reopening flushed file"); -1017 HStoreFile sf = createStoreFileAndReader(dstPath); -1018 -1019 StoreFileReader r = sf.getReader(); -1020 this.storeSize += r.length(); -1021 this.totalUncompressedBytes += r.getTotalUncompressedBytes(); -1022 -1023 if (LOG.isInfoEnabled()) { -1024 LOG.info("Added " + sf + ", entries=" + r.getEntries() + -1025 ", sequenceid=" + logCacheFlushId + -1026 ", filesize=" + TraditionalBinaryPrefix.long2String(r.length(), "", 1)); -1027 } -1028 return sf; -1029 } -1030 -1031 /** -1032 * @param maxKeyCount -1033 * @param compression Compression algorithm to use -1034 * @param isCompaction whether we are creating a new file in a compaction -1035 * @param includeMVCCReadpoint - whether to include MVCC or not -1036 * @param includesTag - includesTag or not -1037 * @return Writer for a new StoreFile in the tmp dir. -1038 */ -1039 // TODO : allow the Writer factory to create Writers of ShipperListener type only in case of -1040 // compaction -1041 public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression, -1042 boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag, -1043 boolean shouldDropBehind) throws IOException { -1044 final CacheConfig writerCacheConf; -1045 if (isCompaction) { -1046 // Don't cache data on write on compactions. -1047 writerCacheConf = new CacheConfig(cacheConf); -1048 writerCacheConf.setCacheDataOnWrite(false); -1049 } else { -1050 writerCacheConf = cacheConf; -1051 } -1052 InetSocketAddress[] favoredNodes = null; -1053 if (region.getRegionServerServices() != null) { -1054 favoredNodes = region.getRegionServerServices().getFavoredNodesForRegion( -1055 region.getRegionInfo().getEncodedName()); -1056 } -1057 HFileContext hFileContext = createFileContext(compression, includeMVCCReadpoint, includesTag, -1058 cryptoContext); -1059 Path familyTempDir = new Path(fs.getTempDir(), family.getNameAsString()); -1060 StoreFileWriter.Builder builder = new StoreFileWriter.Builder(conf, writerCacheConf, -1061 this.getFileSystem()) -1062 .withOutputDir(familyTempDir) -1063 .withComparator(comparator) -1064 .withBloomType(family.getBloomFilterType()) -1065 .withMaxKeyCount(maxKeyCount) -1066 .withFavoredNodes(favoredNodes) -1067 .withFileContext(hFileContext) -1068 .withShouldDropCacheBehind(shouldDropBehind); -1069 return builder.build(); -1070 } -1071 -1072 private HFileContext createFileContext(Compression.Algorithm compression, -1073 boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context cryptoContext) { -1074 if (compression == null) { -1075 compression = HFile.DEFAULT_COMPRESSION_ALGORITHM; -1076 } -1077 HFileContext hFileContext = new HFileContextBuilder() -1078 .withIncludesMvcc(includeMVCCReadpoint) -1079 .withIncludesTags(includesTag) -1080 .withCompression(compression) -1081 .withCompressTags(family.isCompressTags()) -1082 .withChecksumType(checksumType) -1083 .withBytesPerCheckSum(bytesPerChecksum) -1084 .withBlockSize(blocksize) -1085 .withHBaseCheckSum(true) -1086 .withDataBlockEncoding(family.getDataBlockEncoding()) -1087 .withEncryptionContext(cryptoContext) -1088 .withCreateTime(EnvironmentEdgeManager.currentTime()) -1089 .build(); -1090 return hFileContext; -1091 } -1092 +657 @VisibleForTesting +658 protected HStoreFile createStoreFileAndReader(final Path p) throws IOException { +659 StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(), p); +660 return createStoreFileAndReader(info); +661 } +662 +663 private HStoreFile createStoreFileAndReader(StoreFileInfo info) throws IOException { +664 info.setRegionCoprocessorHost(this.region.getCoprocessorHost()); +665 HStoreFile storeFile = new HStoreFile(this.getFileSystem(), info, this.conf, this.cacheConf, +666 this.family.getBloomFilterType(), isPrimaryReplicaStore()); +667 storeFile.initReader(); +668 return storeFile; +669 } +670 +671 /** +672 * This message intends to inform the MemStore that next coming updates +673 * are going to be part of the replaying edits from WAL +674 */ +675 public void startReplayingFromWAL(){ +676 this.memstore.startReplayingFromWAL(); +677 } +678 +679 /** +680 * This message intends to inform the MemStore that the replaying edits from WAL +681 * are done +682 */ +683 public void stopReplayingFromWAL(){ +684 this.memstore.stopReplayingFromWAL(); +685 } +686 +687 /** +688 * Adds a value to the memstore +689 */ +690 public void add(final Cell cell, MemStoreSizing memstoreSizing) { +691 lock.readLock().lock(); +692 try { +693 this.memstore.add(cell, memstoreSizing); +694 } finally { +695 lock.readLock().unlock(); +696 } +697 } +698 +699 /** +700 * Adds the specified value to the memstore +701 */ +702 public void add(final Iterable<Cell> cells, MemStoreSizing memstoreSizing) { +703 lock.readLock().lock(); +704 try { +705 memstore.add(cells, memstoreSizing); +706 } finally { +707 lock.readLock().unlock(); +708 } +709 } +710 +711 @Override +712 public long timeOfOldestEdit() { +713 return memstore.timeOfOldestEdit(); +714 } +715 +716 /** +717 * @return All store files. +718 */ +719 @Override +720 public Collection<HStoreFile> getStorefiles() { +721 return this.storeEngine.getStoreFileManager().getStorefiles(); +722 } +723 +724 @Override +725 public Collection<HStoreFile> getCompactedFiles() { +726 return this.storeEngine.getStoreFileManager().getCompactedfiles(); +727 } +728 +729 /** +730 * This throws a WrongRegionException if the HFile does not fit in this region, or an +731 * InvalidHFileException if the HFile is not valid. +732 */ +733 public void assertBulkLoadHFileOk(Path srcPath) throws IOException { +734 HFile.Reader reader = null; +735 try { +736 LOG.info("Validating hfile at " + srcPath + " for inclusion in " +737 + "store " + this + " region " + this.getRegionInfo().getRegionNameAsString()); +738 reader = HFile.createReader(srcPath.getFileSystem(conf), srcPath, cacheConf, +739 isPrimaryReplicaStore(), conf); +740 reader.loadFileInfo(); +741 +742 Optional<byte[]> firstKey = reader.getFirstRowKey(); +743 Preconditions.checkState(firstKey.isPresent(), "First key can not be null"); +744 Optional<Cell> lk = reader.getLastKey(); +745 Preconditions.checkState(lk.isPresent(), "Last key can not be null"); +746 byte[] lastKey = CellUtil.cloneRow(lk.get()); +747 +748 LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey.get()) + +749 " last=" + Bytes.toStringBinary(lastKey)); +750 LOG.debug("Region bounds: first=" + +751 Bytes.toStringBinary(getRegionInfo().getStartKey()) + +752 " last=" + Bytes.toStringBinary(getRegionInfo().getEndKey())); +753 +754 if (!this.getRegionInfo().containsRange(firstKey.get(), lastKey)) { +755 throw new WrongRegionException( +756 "Bulk load file " + srcPath.toString() + " does not fit inside region " +757 + this.getRegionInfo().getRegionNameAsString()); +758 } +759 +760 if(reader.length() > conf.getLong(HConstants.HREGION_MAX_FILESIZE, +761 HConstants.DEFAULT_MAX_FILE_SIZE)) { +762 LOG.warn("Trying to bulk load hfile " + srcPath.toString() + " with size: " + +763 reader.length() + " bytes can be problematic as it may lead to oversplitting."); +764 } +765 +766 if (verifyBulkLoads) { +767 long verificationStartTime = EnvironmentEdgeManager.currentTime(); +768 LOG.info("Full verification started for bulk load hfile: " + srcPath.toString()); +769 Cell prevCell = null; +770 HFileScanner scanner = reader.getScanner(false, false, false); +771 scanner.seekTo(); +772 do { +773 Cell cell = scanner.getCell(); +774 if (prevCell != null) { +775 if (comparator.compareRows(prevCell, cell) > 0) { +776 throw new InvalidHFileException("Previous row is greater than" +777 + " current row: path=" + srcPath + " previous=" +778 + CellUtil.getCellKeyAsString(prevCell) + " current=" +779 + CellUtil.getCellKeyAsString(cell)); +780 } +781 if (CellComparator.getInstance().compareFamilies(prevCell, cell) != 0) { +782 throw new InvalidHFileException("Previous key had different" +783 + " family compared to current key: path=" + srcPath +784 + " previous=" +785 + Bytes.toStringBinary(prevCell.getFamilyArray(), prevCell.getFamilyOffset(), +786 prevCell.getFamilyLength()) +787 + " current=" +788 + Bytes.toStringBinary(cell.getFamilyArray(), cell.getFamilyOffset(), +789 cell.getFamilyLength())); +790 } +791 } +792 prevCell = cell; +793 } while (scanner.next()); +794 LOG.info("Full verification complete for bulk load hfile: " + srcPath.toString() +795 + " took " + (EnvironmentEdgeManager.currentTime() - verificationStartTime) +796 + " ms"); +797 } +798 } finally { +799 if (reader != null) reader.close(); +800 } +801 } +802 +803 /** +804 * This method should only be called from Region. It is assumed that the ranges of values in the +805 * HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this) +806 * +807 * @param srcPathStr +808 * @param seqNum sequence Id associated with the HFile +809 */ +810 public Pair<Path, Path> preBulkLoadHFile(String srcPathStr, long seqNum) throws IOException { +811 Path srcPath = new Path(srcPathStr); +812 return fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum); +813 } +814 +815 public Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws IOException { +816 Path srcPath = new Path(srcPathStr); +817 try { +818 fs.commitStoreFile(srcPath, dstPath); +819 } finally { +820 if (this.getCoprocessorHost() != null) { +821 this.getCoprocessorHost().postCommitStoreFile(family, srcPath, dstPath); +822 } +823 } +824 +825 LOG.info("Loaded HFile " + srcPath + " into store '" + getColumnFamilyName() + "' as " +826 + dstPath + " - updating store file list."); +827 +828 HStoreFile sf = createStoreFileAndReader(dstPath); +829 bulkLoadHFile(sf); +830 +831 LOG.info("Successfully loaded store file " + srcPath + " into store " + this +832 + " (new location: " + dstPath + ")"); +833 +834 return dstPath; +835 } +836 +837 public void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException { +838 HStoreFile sf = createStoreFileAndReader(fileInfo); +839 bulkLoadHFile(sf); +840 } +841 +842 private void bulkLoadHFile(HStoreFile sf) throws IOException { +843 StoreFileReader r = sf.getReader(); +844 this.storeSize += r.length(); +845 this.totalUncompressedBytes += r.getTotalUncompressedBytes(); +846 +847 // Append the new storefile into the list +848 this.lock.writeLock().lock(); +849 try { +850 this.storeEngine.getStoreFileManager().insertNewFiles(Lists.newArrayList(sf)); +851 } finally { +852 // We need the lock, as long as we are updating the storeFiles +853 // or changing the memstore. Let us release it before calling +854 // notifyChangeReadersObservers. See HBASE-4485 for a possible +855 // deadlock scenario that could have happened if continue to hold +856 // the lock. +857 this.lock.writeLock().unlock(); +858 } +859 LOG.info("Loaded HFile " + sf.getFileInfo() + " into store '" + getColumnFamilyName()); +860 if (LOG.isTraceEnabled()) { +861 String traceMessage = "BULK LOAD time,size,store size,store files [" +862 + EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + storeSize +863 + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]"; +864 LOG.trace(traceMessage); +865 } +866 } +867 +868 /** +869 * Close all the readers We don't need to worry about subsequent requests because the Region holds +870 * a write lock that will prevent any more reads or writes. +871 * @return the {@link StoreFile StoreFiles} that were previously being used. +872 * @throws IOException on failure +873 */ +874 public ImmutableCollection<HStoreFile> close() throws IOException { +875 this.archiveLock.lock(); +876 this.lock.writeLock().lock(); +877 try { +878 // Clear so metrics doesn't find them. +879 ImmutableCollection<HStoreFile> result = storeEngine.getStoreFileManager().clearFiles(); +880 Collection<HStoreFile> compactedfiles = +881 storeEngine.getStoreFileManager().clearCompactedFiles(); +882 // clear the compacted files +883 if (compactedfiles != null && !compactedfiles.isEmpty()) { +884 removeCompactedfiles(compactedfiles); +885 } +886 if (!result.isEmpty()) { +887 // initialize the thread pool for closing store files in parallel. +888 ThreadPoolExecutor storeFileCloserThreadPool = this.region +889 .getStoreFileOpenAndCloseThreadPool("StoreFileCloserThread-" +890 + this.getColumnFamilyName()); +891 +892 // close each store file in parallel +893 CompletionService<Void> completionService = +894 new ExecutorCompletionService<>(storeFileCloserThreadPool); +895 for (HStoreFile f : result) { +896 completionService.submit(new Callable<Void>() { +897 @Override +898 public Void call() throws IOException { +899 boolean evictOnClose = +900 cacheConf != null? cacheConf.shouldEvictOnClose(): true; +901 f.closeStoreFile(evictOnClose); +902 return null; +903 } +904 }); +905 } +906 +907 IOException ioe = null; +908 try { +909 for (int i = 0; i < result.size(); i++) { +910 try { +911 Future<Void> future = completionService.take(); +912 future.get(); +913 } catch (InterruptedException e) { +914 if (ioe == null) { +915 ioe = new InterruptedIOException(); +916 ioe.initCause(e); +917 } +918 } catch (ExecutionException e) { +919 if (ioe == null) ioe = new IOException(e.getCause()); +920 } +921 } +922 } finally { +923 storeFileCloserThreadPool.shutdownNow(); +924 } +925 if (ioe != null) throw ioe; +926 } +927 LOG.info("Closed " + this); +928 return result; +929 } finally { +930 this.lock.writeLock().unlock(); +931 this.archiveLock.unlock(); +932 } +933 } +934 +935 /** +936 * Snapshot this stores memstore. Call before running +937 * {@link #flushCache(long, MemStoreSnapshot, MonitoredTask, ThroughputController, +938 * FlushLifeCycleTracker)} +939 * so it has some work to do. +940 */ +941 void snapshot() { +942 this.lock.writeLock().lock(); +943 try { +944 this.memstore.snapshot(); +945 } finally { +946 this.lock.writeLock().unlock(); +947 } +948 } +949 +950 /** +951 * Write out current snapshot. Presumes {@link #snapshot()} has been called previously. +952 * @param logCacheFlushId flush sequence number +953 * @param snapshot +954 * @param status +955 * @param throughputController +956 * @return The path name of the tmp file to which the store was flushed +957 * @throws IOException if exception occurs during process +958 */ +959 protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot, +960 MonitoredTask status, ThroughputController throughputController, +961 FlushLifeCycleTracker tracker) throws IOException { +962 // If an exception happens flushing, we let it out without clearing +963 // the memstore snapshot. The old snapshot will be returned when we say +964 // 'snapshot', the next time flush comes around. +965 // Retry after catching exception when flushing, otherwise server will abort +966 // itself +967 StoreFlusher flusher = storeEngine.getStoreFlusher(); +968 IOException lastException = null; +969 for (int i = 0; i < flushRetriesNumber; i++) { +970 try { +971 List<Path> pathNames = +972 flusher.flushSnapshot(snapshot, logCacheFlushId, status, throughputController, tracker); +973 Path lastPathName = null; +974 try { +975 for (Path pathName : pathNames) { +976 lastPathName = pathName; +977 validateStoreFile(pathName); +978 } +979 return pathNames; +980 } catch (Exception e) { +981 LOG.warn("Failed validating store file " + lastPathName + ", retrying num=" + i, e); +982 if (e instanceof IOException) { +983 lastException = (IOException) e; +984 } else { +985 lastException = new IOException(e); +986 } +987 } +988 } catch (IOException e) { +989 LOG.warn("Failed flushing store file, retrying num=" + i, e); +990 lastException = e; +991 } +992 if (lastException != null && i < (flushRetriesNumber - 1)) { +993 try { +994 Thread.sleep(pauseTime); +995 } catch (InterruptedException e) { +996 IOException iie = new InterruptedIOException(); +997 iie.initCause(e); +998 throw iie; +999 } +1000 } +1001 } +1002 throw lastException; +1003 } +1004 +1005 /** +1006 * @param path The pathname of the tmp file into which the store was flushed +1007 * @param logCacheFlushId +1008 * @param status +1009 * @return store file created. +1010 * @throws IOException +1011 */ +1012 private HStoreFile commitFile(Path path, long logCacheFlushId, MonitoredTask status) +1013 throws IOException { +1014 // Write-out finished successfully, move into the right spot +1015 Path dstPath = fs.commitStoreFile(getColumnFamilyName(), path); +1016 +1017 status.setStatus("Flushing " + this + ": reopening flushed file"); +1018 HStoreFile sf = createStoreFileAndReader(dstPath); +1019 +1020 StoreFileReader r = sf.getReader(); +1021 this.storeSize += r.length(); +1022 this.totalUncompressedBytes += r.getTotalUncompressedBytes(); +1023 +1024 if (LOG.isInfoEnabled()) { +1025 LOG.info("Added " + sf + ", entries=" + r.getEntries() + +1026 ", sequenceid=" + logCacheFlushId + +1027 ", filesize=" + TraditionalBinaryPrefix.long2String(r.length(), "", 1)); +1028 } +1029 return sf; +1030 } +1031 +1032 /** +1033 * @param maxKeyCount +1034 * @param compression Compression algorithm to use +1035 * @param isCompaction whether we are creating a new file in a compaction +1036 * @param includeMVCCReadpoint - whether to include MVCC or not +1037 * @param includesTag - includesTag or not +1038 * @return Writer for a new StoreFile in the tmp dir. +1039 */ +1040 // TODO : allow the Writer factory to create Writers of ShipperListener type only in case of +1041 // compaction +1042 public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression, +1043 boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag, +1044 boolean shouldDropBehind) throws IOException { +1045 final CacheConfig writerCacheConf; +1046 if (isCompaction) { +1047 // Don't cache data on write on compactions. +1048 writerCacheConf = new CacheConfig(cacheConf); +1049 writerCacheConf.setCacheDataOnWrite(false); +1050 } else { +1051 writerCacheConf = cacheConf; +1052 } +1053 InetSocketAddress[] favoredNodes = null; +1054 if (region.getRegionServerServices() != null) { +1055 favoredNodes = region.getRegionServerServices().getFavoredNodesForRegion( +1056 region.getRegionInfo().getEncodedName()); +1057 } +1058 HFileContext hFileContext = createFileContext(compression, includeMVCCReadpoint, includesTag, +1059 cryptoContext); +1060 Path familyTempDir = new Path(fs.getTempDir(), family.getNameAsString()); +1061 StoreFileWriter.Builder builder = new StoreFileWriter.Builder(conf, writerCacheConf, +1062 this.getFileSystem()) +1063 .withOutputDir(familyTempDir) +1064 .withComparator(comparator) +1065 .withBloomType(family.getBloomFilterType()) +1066 .withMaxKeyCount(maxKeyCount) +1067 .withFavoredNodes(favoredNodes) +1068 .withFileContext(hFileContext) +1069 .withShouldDropCacheBehind(shouldDropBehind); +1070 return builder.build(); +1071 } +1072 +1073 private HFileContext createFileContext(Compression.Algorithm compression, +1074 boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context cryptoContext) { +1075 if (compression == null) { +1076 compression = HFile.DEFAULT_COMPRESSION_ALGORITHM; +1077 } +1078 HFileContext hFileContext = new HFileContextBuilder() +1079 .withIncludesMvcc(includeMVCCReadpoint) +1080 .withIncludesTags(includesTag) +1081 .withCompression(compression) +1082 .withCompressTags(family.isCompressTags()) +1083 .withChecksumType(checksumType) +1084 .withBytesPerCheckSum(bytesPerChecksum) +1085 .withBlockSize(blocksize) +1086 .withHBaseCheckSum(true) +1087 .withDataBlockEncoding(family.getDataBlockEncoding()) +1088 .withEncryptionContext(cryptoContext) +1089 .withCreateTime(EnvironmentEdgeManager.currentTime()) +1090 .build(); +1091 return hFileContext; +1092 } 1093 -1094 private long getTotalSize(Collection<HStoreFile> sfs) { -1095 return sfs.stream().mapToLong(sf -> sf.getReader().length()).sum(); -1096 } -1097 -1098 /** -1099 * Change storeFiles adding into place the Reader produced by this new flush. -1100 * @param sfs Store files -1101 * @param snapshotId -1102 * @throws IOException -1103 * @return Whe