Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 89B8018962 for ; Fri, 18 Dec 2015 16:42:36 +0000 (UTC) Received: (qmail 70467 invoked by uid 500); 18 Dec 2015 16:42:35 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 70250 invoked by uid 500); 18 Dec 2015 16:42:35 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 69338 invoked by uid 99); 18 Dec 2015 16:42:35 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Dec 2015 16:42:35 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EBC5BE6987; Fri, 18 Dec 2015 16:42:34 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: misty@apache.org To: commits@hbase.apache.org Date: Fri, 18 Dec 2015 16:42:38 -0000 Message-Id: In-Reply-To: <2b06700c24844cf38cd4158783c0e9ba@git.apache.org> References: <2b06700c24844cf38cd4158783c0e9ba@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [05/48] hbase-site git commit: Published site at 4bfeccb87a94cfe232ea8fc9a6f40ff5b8d3b1c5. http://git-wip-us.apache.org/repos/asf/hbase-site/blob/d917c66a/xref/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.html ---------------------------------------------------------------------- diff --git a/xref/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.html b/xref/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.html index f8ae7d0..ced1bbd 100644 --- a/xref/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.html +++ b/xref/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.html @@ -435,654 +435,655 @@ 425 * @param hfilesDir directory containing list of hfiles to be loaded into the table 426 * @param table table to which hfiles should be loaded 427 * @param queue queue which needs to be loaded into the table -428 * @throws IOException If any I/O or network error occurred -429 */ -430 public void prepareHFileQueue(Path hfofDir, Table table, Deque<LoadQueueItem> queue, -431 boolean validateHFile) throws IOException { -432 discoverLoadQueue(queue, hfofDir, validateHFile); -433 validateFamiliesInHFiles(table, queue); -434 } -435 -436 // Initialize a thread pool -437 private ExecutorService createExecutorService() { -438 ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); -439 builder.setNameFormat("LoadIncrementalHFiles-%1$d"); -440 ExecutorService pool = new ThreadPoolExecutor(nrThreads, nrThreads, 60, TimeUnit.SECONDS, -441 new LinkedBlockingQueue<Runnable>(), builder.build()); -442 ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true); -443 return pool; -444 } -445 -446 /** -447 * Checks whether there is any invalid family name in HFiles to be bulk loaded. -448 */ -449 private void validateFamiliesInHFiles(Table table, Deque<LoadQueueItem> queue) -450 throws IOException { -451 Collection<HColumnDescriptor> families = table.getTableDescriptor().getFamilies(); -452 List<String> familyNames = new ArrayList<String>(families.size()); -453 for (HColumnDescriptor family : families) { -454 familyNames.add(family.getNameAsString()); -455 } -456 List<String> unmatchedFamilies = new ArrayList<String>(); -457 Iterator<LoadQueueItem> queueIter = queue.iterator(); -458 while (queueIter.hasNext()) { -459 LoadQueueItem lqi = queueIter.next(); -460 String familyNameInHFile = Bytes.toString(lqi.family); -461 if (!familyNames.contains(familyNameInHFile)) { -462 unmatchedFamilies.add(familyNameInHFile); -463 } -464 } -465 if (unmatchedFamilies.size() > 0) { -466 String msg = -467 "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: " -468 + unmatchedFamilies + "; valid family names of table " + table.getName() + " are: " -469 + familyNames; -470 LOG.error(msg); -471 throw new IOException(msg); -472 } -473 } -474 -475 /** -476 * Used by the replication sink to load the hfiles from the source cluster. It does the following, -477 * 1. {@link LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)} 2. -478 * {@link -479 * LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)} -480 * @param table Table to which these hfiles should be loaded to -481 * @param conn Connection to use -482 * @param queue {@link LoadQueueItem} has hfiles yet to be loaded -483 * @param startEndKeys starting and ending row keys of the region -484 */ -485 public void loadHFileQueue(final Table table, final Connection conn, Deque<LoadQueueItem> queue, -486 Pair<byte[][], byte[][]> startEndKeys) throws IOException { -487 ExecutorService pool = null; -488 try { -489 pool = createExecutorService(); -490 Multimap<ByteBuffer, LoadQueueItem> regionGroups = -491 groupOrSplitPhase(table, pool, queue, startEndKeys); -492 bulkLoadPhase(table, conn, pool, queue, regionGroups); -493 } finally { -494 if (pool != null) { -495 pool.shutdown(); -496 } -497 } -498 } -499 -500 /** -501 * This takes the LQI's grouped by likely regions and attempts to bulk load -502 * them. Any failures are re-queued for another pass with the -503 * groupOrSplitPhase. -504 */ -505 protected void bulkLoadPhase(final Table table, final Connection conn, -506 ExecutorService pool, Deque<LoadQueueItem> queue, -507 final Multimap<ByteBuffer, LoadQueueItem> regionGroups) throws IOException { -508 // atomically bulk load the groups. -509 Set<Future<List<LoadQueueItem>>> loadingFutures = new HashSet<Future<List<LoadQueueItem>>>(); -510 for (Entry<ByteBuffer, ? extends Collection<LoadQueueItem>> e: regionGroups.asMap().entrySet()){ -511 final byte[] first = e.getKey().array(); -512 final Collection<LoadQueueItem> lqis = e.getValue(); -513 -514 final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() { -515 @Override -516 public List<LoadQueueItem> call() throws Exception { -517 List<LoadQueueItem> toRetry = -518 tryAtomicRegionLoad(conn, table.getName(), first, lqis); -519 return toRetry; -520 } -521 }; -522 loadingFutures.add(pool.submit(call)); -523 } -524 -525 // get all the results. -526 for (Future<List<LoadQueueItem>> future : loadingFutures) { -527 try { -528 List<LoadQueueItem> toRetry = future.get(); -529 -530 // LQIs that are requeued to be regrouped. -531 queue.addAll(toRetry); -532 -533 } catch (ExecutionException e1) { -534 Throwable t = e1.getCause(); -535 if (t instanceof IOException) { -536 // At this point something unrecoverable has happened. -537 // TODO Implement bulk load recovery -538 throw new IOException("BulkLoad encountered an unrecoverable problem", t); -539 } -540 LOG.error("Unexpected execution exception during bulk load", e1); -541 throw new IllegalStateException(t); -542 } catch (InterruptedException e1) { -543 LOG.error("Unexpected interrupted exception during bulk load", e1); -544 throw (InterruptedIOException)new InterruptedIOException().initCause(e1); -545 } -546 } -547 } -548 -549 private boolean checkHFilesCountPerRegionPerFamily( -550 final Multimap<ByteBuffer, LoadQueueItem> regionGroups) { -551 for (Entry<ByteBuffer, -552 ? extends Collection<LoadQueueItem>> e: regionGroups.asMap().entrySet()) { -553 final Collection<LoadQueueItem> lqis = e.getValue(); -554 HashMap<byte[], MutableInt> filesMap = new HashMap<byte[], MutableInt>(); -555 for (LoadQueueItem lqi: lqis) { -556 MutableInt count = filesMap.get(lqi.family); -557 if (count == null) { -558 count = new MutableInt(); -559 filesMap.put(lqi.family, count); -560 } -561 count.increment(); -562 if (count.intValue() > maxFilesPerRegionPerFamily) { -563 LOG.error("Trying to load more than " + maxFilesPerRegionPerFamily -564 + " hfiles to family " + Bytes.toStringBinary(lqi.family) -565 + " of region with start key " -566 + Bytes.toStringBinary(e.getKey())); -567 return false; -568 } -569 } -570 } -571 return true; -572 } -573 -574 /** -575 * @return A map that groups LQI by likely bulk load region targets. -576 */ -577 private Multimap<ByteBuffer, LoadQueueItem> groupOrSplitPhase(final Table table, -578 ExecutorService pool, Deque<LoadQueueItem> queue, -579 final Pair<byte[][], byte[][]> startEndKeys) throws IOException { -580 // <region start key, LQI> need synchronized only within this scope of this -581 // phase because of the puts that happen in futures. -582 Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create(); -583 final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs); -584 -585 // drain LQIs and figure out bulk load groups -586 Set<Future<List<LoadQueueItem>>> splittingFutures = new HashSet<Future<List<LoadQueueItem>>>(); -587 while (!queue.isEmpty()) { -588 final LoadQueueItem item = queue.remove(); -589 -590 final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() { -591 @Override -592 public List<LoadQueueItem> call() throws Exception { -593 List<LoadQueueItem> splits = groupOrSplit(regionGroups, item, table, startEndKeys); -594 return splits; -595 } -596 }; -597 splittingFutures.add(pool.submit(call)); -598 } -599 // get all the results. All grouping and splitting must finish before -600 // we can attempt the atomic loads. -601 for (Future<List<LoadQueueItem>> lqis : splittingFutures) { -602 try { -603 List<LoadQueueItem> splits = lqis.get(); -604 if (splits != null) { -605 queue.addAll(splits); -606 } -607 } catch (ExecutionException e1) { -608 Throwable t = e1.getCause(); -609 if (t instanceof IOException) { -610 LOG.error("IOException during splitting", e1); -611 throw (IOException)t; // would have been thrown if not parallelized, -612 } -613 LOG.error("Unexpected execution exception during splitting", e1); -614 throw new IllegalStateException(t); -615 } catch (InterruptedException e1) { -616 LOG.error("Unexpected interrupted exception during splitting", e1); -617 throw (InterruptedIOException)new InterruptedIOException().initCause(e1); -618 } -619 } -620 return regionGroups; -621 } -622 -623 // unique file name for the table -624 private String getUniqueName() { -625 return UUID.randomUUID().toString().replaceAll("-", ""); -626 } -627 -628 protected List<LoadQueueItem> splitStoreFile(final LoadQueueItem item, -629 final Table table, byte[] startKey, -630 byte[] splitKey) throws IOException { -631 final Path hfilePath = item.hfilePath; -632 -633 // We use a '_' prefix which is ignored when walking directory trees -634 // above. -635 final String TMP_DIR = "_tmp"; -636 Path tmpDir = item.hfilePath.getParent(); -637 if (!tmpDir.getName().equals(TMP_DIR)) { -638 tmpDir = new Path(tmpDir, TMP_DIR); -639 } -640 -641 LOG.info("HFile at " + hfilePath + " no longer fits inside a single " + -642 "region. Splitting..."); -643 -644 String uniqueName = getUniqueName(); -645 HColumnDescriptor familyDesc = table.getTableDescriptor().getFamily(item.family); -646 -647 Path botOut = new Path(tmpDir, uniqueName + ".bottom"); -648 Path topOut = new Path(tmpDir, uniqueName + ".top"); -649 splitStoreFile(getConf(), hfilePath, familyDesc, splitKey, botOut, topOut); -650 -651 FileSystem fs = tmpDir.getFileSystem(getConf()); -652 fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx")); -653 fs.setPermission(botOut, FsPermission.valueOf("-rwxrwxrwx")); -654 fs.setPermission(topOut, FsPermission.valueOf("-rwxrwxrwx")); -655 -656 // Add these back at the *front* of the queue, so there's a lower -657 // chance that the region will just split again before we get there. -658 List<LoadQueueItem> lqis = new ArrayList<LoadQueueItem>(2); -659 lqis.add(new LoadQueueItem(item.family, botOut)); -660 lqis.add(new LoadQueueItem(item.family, topOut)); -661 -662 LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut); -663 return lqis; -664 } -665 -666 /** -667 * Attempt to assign the given load queue item into its target region group. -668 * If the hfile boundary no longer fits into a region, physically splits -669 * the hfile such that the new bottom half will fit and returns the list of -670 * LQI's corresponding to the resultant hfiles. -671 * -672 * protected for testing -673 * @throws IOException -674 */ -675 protected List<LoadQueueItem> groupOrSplit(Multimap<ByteBuffer, LoadQueueItem> regionGroups, -676 final LoadQueueItem item, final Table table, -677 final Pair<byte[][], byte[][]> startEndKeys) -678 throws IOException { -679 final Path hfilePath = item.hfilePath; -680 // fs is the source filesystem -681 if (fs == null) { -682 fs = hfilePath.getFileSystem(getConf()); -683 } -684 HFile.Reader hfr = HFile.createReader(fs, hfilePath, -685 new CacheConfig(getConf()), getConf()); -686 final byte[] first, last; -687 try { -688 hfr.loadFileInfo(); -689 first = hfr.getFirstRowKey(); -690 last = hfr.getLastRowKey(); -691 } finally { -692 hfr.close(); -693 } -694 -695 LOG.info("Trying to load hfile=" + hfilePath + -696 " first=" + Bytes.toStringBinary(first) + -697 " last=" + Bytes.toStringBinary(last)); -698 if (first == null || last == null) { -699 assert first == null && last == null; -700 // TODO what if this is due to a bad HFile? -701 LOG.info("hfile " + hfilePath + " has no entries, skipping"); -702 return null; -703 } -704 if (Bytes.compareTo(first, last) > 0) { -705 throw new IllegalArgumentException( -706 "Invalid range: " + Bytes.toStringBinary(first) + -707 " > " + Bytes.toStringBinary(last)); -708 } -709 int idx = Arrays.binarySearch(startEndKeys.getFirst(), first, -710 Bytes.BYTES_COMPARATOR); -711 if (idx < 0) { -712 // not on boundary, returns -(insertion index). Calculate region it -713 // would be in. -714 idx = -(idx + 1) - 1; -715 } -716 final int indexForCallable = idx; -717 -718 /** -719 * we can consider there is a region hole in following conditions. 1) if idx < 0,then first -720 * region info is lost. 2) if the endkey of a region is not equal to the startkey of the next -721 * region. 3) if the endkey of the last region is not empty. -722 */ -723 if (indexForCallable < 0) { -724 throw new IOException("The first region info for table " -725 + table.getName() -726 + " cann't be found in hbase:meta.Please use hbck tool to fix it first."); -727 } else if ((indexForCallable == startEndKeys.getFirst().length - 1) -728 && !Bytes.equals(startEndKeys.getSecond()[indexForCallable], HConstants.EMPTY_BYTE_ARRAY)) { -729 throw new IOException("The last region info for table " -730 + table.getName() -731 + " cann't be found in hbase:meta.Please use hbck tool to fix it first."); -732 } else if (indexForCallable + 1 < startEndKeys.getFirst().length -733 && !(Bytes.compareTo(startEndKeys.getSecond()[indexForCallable], -734 startEndKeys.getFirst()[indexForCallable + 1]) == 0)) { -735 throw new IOException("The endkey of one region for table " -736 + table.getName() -737 + " is not equal to the startkey of the next region in hbase:meta." -738 + "Please use hbck tool to fix it first."); -739 } -740 -741 boolean lastKeyInRange = -742 Bytes.compareTo(last, startEndKeys.getSecond()[idx]) < 0 || -743 Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY); -744 if (!lastKeyInRange) { -745 List<LoadQueueItem> lqis = splitStoreFile(item, table, -746 startEndKeys.getFirst()[indexForCallable], -747 startEndKeys.getSecond()[indexForCallable]); -748 return lqis; -749 } -750 -751 // group regions. -752 regionGroups.put(ByteBuffer.wrap(startEndKeys.getFirst()[idx]), item); -753 return null; -754 } -755 -756 /** -757 * Attempts to do an atomic load of many hfiles into a region. If it fails, -758 * it returns a list of hfiles that need to be retried. If it is successful -759 * it will return an empty list. -760 * -761 * NOTE: To maintain row atomicity guarantees, region server callable should -762 * succeed atomically and fails atomically. -763 * -764 * Protected for testing. -765 * -766 * @return empty list if success, list of items to retry on recoverable -767 * failure -768 */ -769 protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn, -770 final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis) -771 throws IOException { -772 final List<Pair<byte[], String>> famPaths = -773 new ArrayList<Pair<byte[], String>>(lqis.size()); -774 for (LoadQueueItem lqi : lqis) { -775 famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString())); -776 } -777 -778 final RegionServerCallable<Boolean> svrCallable = -779 new RegionServerCallable<Boolean>(conn, tableName, first) { -780 @Override -781 public Boolean call(int callTimeout) throws Exception { -782 SecureBulkLoadClient secureClient = null; -783 boolean success = false; -784 -785 try { -786 LOG.debug("Going to connect to server " + getLocation() + " for row " -787 + Bytes.toStringBinary(getRow()) + " with hfile group " + famPaths); -788 byte[] regionName = getLocation().getRegionInfo().getRegionName(); -789 if (!isSecureBulkLoadEndpointAvailable()) { -790 success = ProtobufUtil.bulkLoadHFile(getStub(), famPaths, regionName, assignSeqIds); -791 } else { -792 try (Table table = conn.getTable(getTableName())) { -793 secureClient = new SecureBulkLoadClient(table); -794 success = secureClient.bulkLoadHFiles(famPaths, fsDelegationToken.getUserToken(), -795 bulkToken, getLocation().getRegionInfo().getStartKey()); -796 } -797 } -798 return success; -799 } finally { -800 //Best effort copying of files that might not have been imported -801 //from the staging directory back to original location -802 //in user directory -803 if(secureClient != null && !success) { -804 FileSystem targetFs = FileSystem.get(getConf()); -805 // fs is the source filesystem -806 if(fs == null) { -807 fs = lqis.iterator().next().hfilePath.getFileSystem(getConf()); -808 } -809 // Check to see if the source and target filesystems are the same -810 // If they are the same filesystem, we will try move the files back -811 // because previously we moved them to the staging directory. -812 if (FSHDFSUtils.isSameHdfs(getConf(), fs, targetFs)) { -813 for(Pair<byte[], String> el : famPaths) { -814 Path hfileStagingPath = null; -815 Path hfileOrigPath = new Path(el.getSecond()); -816 try { -817 hfileStagingPath= new Path(secureClient.getStagingPath(bulkToken, el.getFirst()), -818 hfileOrigPath.getName()); -819 if(targetFs.rename(hfileStagingPath, hfileOrigPath)) { -820 LOG.debug("Moved back file " + hfileOrigPath + " from " + -821 hfileStagingPath); -822 } else if(targetFs.exists(hfileStagingPath)){ -823 LOG.debug("Unable to move back file " + hfileOrigPath + " from " + -824 hfileStagingPath); -825 } -826 } catch(Exception ex) { -827 LOG.debug("Unable to move back file " + hfileOrigPath + " from " + -828 hfileStagingPath, ex); -829 } -830 } -831 } -832 } -833 } -834 } -835 }; -836 -837 try { -838 List<LoadQueueItem> toRetry = new ArrayList<LoadQueueItem>(); -839 Configuration conf = getConf(); -840 boolean success = RpcRetryingCallerFactory.instantiate(conf, -841 null).<Boolean> newCaller() -842 .callWithRetries(svrCallable, Integer.MAX_VALUE); -843 if (!success) { -844 LOG.warn("Attempt to bulk load region containing " -845 + Bytes.toStringBinary(first) + " into table " -846 + tableName + " with files " + lqis -847 + " failed. This is recoverable and they will be retried."); -848 toRetry.addAll(lqis); // return lqi's to retry -849 } -850 // success -851 return toRetry; -852 } catch (IOException e) { -853 LOG.error("Encountered unrecoverable error from region server, additional details: " -854 + svrCallable.getExceptionMessageAdditionalDetail(), e); -855 throw e; -856 } -857 } -858 -859 private boolean isSecureBulkLoadEndpointAvailable() { -860 String classes = getConf().get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); -861 return classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint"); -862 } -863 -864 /** -865 * Split a storefile into a top and bottom half, maintaining -866 * the metadata, recreating bloom filters, etc. -867 */ -868 static void splitStoreFile( -869 Configuration conf, Path inFile, -870 HColumnDescriptor familyDesc, byte[] splitKey, -871 Path bottomOut, Path topOut) throws IOException -872 { -873 // Open reader with no block cache, and not in-memory -874 Reference topReference = Reference.createTopReference(splitKey); -875 Reference bottomReference = Reference.createBottomReference(splitKey); -876 -877 copyHFileHalf(conf, inFile, topOut, topReference, familyDesc); -878 copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc); -879 } -880 -881 /** -882 * Copy half of an HFile into a new HFile. -883 */ -884 private static void copyHFileHalf( -885 Configuration conf, Path inFile, Path outFile, Reference reference, -886 HColumnDescriptor familyDescriptor) -887 throws IOException { -888 FileSystem fs = inFile.getFileSystem(conf); -889 CacheConfig cacheConf = new CacheConfig(conf); -890 HalfStoreFileReader halfReader = null; -891 StoreFile.Writer halfWriter = null; -892 try { -893 halfReader = new HalfStoreFileReader(fs, inFile, cacheConf, reference, conf); -894 Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo(); -895 -896 int blocksize = familyDescriptor.getBlocksize(); -897 Algorithm compression = familyDescriptor.getCompressionType(); -898 BloomType bloomFilterType = familyDescriptor.getBloomFilterType(); -899 HFileContext hFileContext = new HFileContextBuilder() -900 .withCompression(compression) -901 .withChecksumType(HStore.getChecksumType(conf)) -902 .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)) -903 .withBlockSize(blocksize) -904 .withDataBlockEncoding(familyDescriptor.getDataBlockEncoding()) -905 .build(); -906 halfWriter = new StoreFile.WriterBuilder(conf, cacheConf, -907 fs) -908 .withFilePath(outFile) -909 .withBloomType(bloomFilterType) -910 .withFileContext(hFileContext) -911 .build(); -912 HFileScanner scanner = halfReader.getScanner(false, false, false); -913 scanner.seekTo(); -914 do { -915 halfWriter.append(scanner.getCell()); -916 } while (scanner.next()); -917 -918 for (Map.Entry<byte[],byte[]> entry : fileInfo.entrySet()) { -919 if (shouldCopyHFileMetaKey(entry.getKey())) { -920 halfWriter.appendFileInfo(entry.getKey(), entry.getValue()); -921 } -922 } -923 } finally { -924 if (halfWriter != null) halfWriter.close(); -925 if (halfReader != null) halfReader.close(cacheConf.shouldEvictOnClose()); -926 } -927 } -928 -929 private static boolean shouldCopyHFileMetaKey(byte[] key) { -930 return !HFile.isReservedFileInfoKey(key); -931 } -932 -933 /* -934 * Infers region boundaries for a new table. -935 * Parameter: -936 * bdryMap is a map between keys to an integer belonging to {+1, -1} -937 * If a key is a start key of a file, then it maps to +1 -938 * If a key is an end key of a file, then it maps to -1 -939 * Algo: -940 * 1) Poll on the keys in order: -941 * a) Keep adding the mapped values to these keys (runningSum) -942 * b) Each time runningSum reaches 0, add the start Key from when the runningSum had started to -943 * a boundary list. -944 * 2) Return the boundary list. -945 */ -946 public static byte[][] inferBoundaries(TreeMap<byte[], Integer> bdryMap) { -947 ArrayList<byte[]> keysArray = new ArrayList<byte[]>(); -948 int runningValue = 0; -949 byte[] currStartKey = null; -950 boolean firstBoundary = true; -951 -952 for (Map.Entry<byte[], Integer> item: bdryMap.entrySet()) { -953 if (runningValue == 0) currStartKey = item.getKey(); -954 runningValue += item.getValue(); -955 if (runningValue == 0) { -956 if (!firstBoundary) keysArray.add(currStartKey); -957 firstBoundary = false; -958 } -959 } -960 -961 return keysArray.toArray(new byte[0][0]); -962 } -963 -964 /* -965 * If the table is created for the first time, then "completebulkload" reads the files twice. -966 * More modifications necessary if we want to avoid doing it. -967 */ -968 private void createTable(TableName tableName, String dirPath, Admin admin) throws Exception { -969 final Path hfofDir = new Path(dirPath); -970 final FileSystem fs = hfofDir.getFileSystem(getConf()); -971 -972 // Add column families -973 // Build a set of keys -974 final HTableDescriptor htd = new HTableDescriptor(tableName); -975 final TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); -976 visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<HColumnDescriptor>() { -977 @Override -978 public HColumnDescriptor bulkFamily(final byte[] familyName) { -979 HColumnDescriptor hcd = new HColumnDescriptor(familyName); -980 htd.addFamily(hcd); -981 return hcd; -982 } -983 @Override -984 public void bulkHFile(final HColumnDescriptor hcd, final FileStatus hfileStatus) -985 throws IOException { -986 Path hfile = hfileStatus.getPath(); -987 HFile.Reader reader = HFile.createReader(fs, hfile, -988 new CacheConfig(getConf()), getConf()); -989 try { -990 if (hcd.getCompressionType() != reader.getFileContext().getCompression()) { -991 hcd.setCompressionType(reader.getFileContext().getCompression()); -992 LOG.info("Setting compression " + hcd.getCompressionType().name() + -993 " for family " + hcd.toString()); -994 } -995 reader.loadFileInfo(); -996 byte[] first = reader.getFirstRowKey(); -997 byte[] last = reader.getLastRowKey(); -998 -999 LOG.info("Trying to figure out region boundaries hfile=" + hfile + -1000 " first=" + Bytes.toStringBinary(first) + -1001 " last=" + Bytes.toStringBinary(last)); -1002 -1003 // To eventually infer start key-end key boundaries -1004 Integer value = map.containsKey(first)? map.get(first):0; -1005 map.put(first, value+1); -1006 -1007 value = map.containsKey(last)? map.get(last):0; -1008 map.put(last, value-1); -1009 } finally { -1010 reader.close(); -1011 } -1012 } -1013 }); -1014 -1015 byte[][] keys = LoadIncrementalHFiles.inferBoundaries(map); -1016 admin.createTable(htd, keys); -1017 -1018 LOG.info("Table "+ tableName +" is available!!"); -1019 } -1020 -1021 @Override -1022 public int run(String[] args) throws Exception { -1023 if (args.length != 2) { -1024 usage(); -1025 return -1; -1026 } -1027 -1028 initialize(); -1029 try (Connection connection = ConnectionFactory.createConnection(getConf()); -1030 Admin admin = connection.getAdmin()) { -1031 String dirPath = args[0]; -1032 TableName tableName = TableName.valueOf(args[1]); -1033 -1034 boolean tableExists = admin.tableExists(tableName); -1035 if (!tableExists) { -1036 if ("yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, "yes"))) { -1037 this.createTable(tableName, dirPath, admin); -1038 } else { -1039 String errorMsg = format("Table '%s' does not exist.", tableName); -1040 LOG.error(errorMsg); -1041 throw new TableNotFoundException(errorMsg); -1042 } -1043 } -1044 -1045 Path hfofDir = new Path(dirPath); -1046 -1047 try (Table table = connection.getTable(tableName); -1048 RegionLocator locator = connection.getRegionLocator(tableName)) { -1049 doBulkLoad(hfofDir, admin, table, locator); -1050 } -1051 } -1052 -1053 return 0; -1054 } -1055 -1056 public static void main(String[] args) throws Exception { -1057 Configuration conf = HBaseConfiguration.create(); -1058 int ret = ToolRunner.run(conf, new LoadIncrementalHFiles(), args); -1059 System.exit(ret); -1060 } -1061 -1062 /** -1063 * Called from replication sink, where it manages bulkToken(staging directory) by itself. This is -1064 * used only when {@link SecureBulkLoadEndpoint} is configured in hbase.coprocessor.region.classes -1065 * property. This directory is used as a temporary directory where all files are initially -1066 * copied/moved from user given directory, set all the required file permissions and then from -1067 * their it is finally loaded into a table. This should be set only when, one would like to manage -1068 * the staging directory by itself. Otherwise this tool will handle this by itself. -1069 * @param stagingDir staging directory path -1070 */ -1071 public void setBulkToken(String stagingDir) { -1072 this.bulkToken = stagingDir; -1073 } -1074 -1075 } +428 * @param validateHFile if true hfiles will be validated for its format +429 * @throws IOException If any I/O or network error occurred +430 */ +431 public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue, +432 boolean validateHFile) throws IOException { +433 discoverLoadQueue(queue, hfilesDir, validateHFile); +434 validateFamiliesInHFiles(table, queue); +435 } +436 +437 // Initialize a thread pool +438 private ExecutorService createExecutorService() { +439 ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); +440 builder.setNameFormat("LoadIncrementalHFiles-%1$d"); +441 ExecutorService pool = new ThreadPoolExecutor(nrThreads, nrThreads, 60, TimeUnit.SECONDS, +442 new LinkedBlockingQueue<Runnable>(), builder.build()); +443 ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true); +444 return pool; +445 } +446 +447 /** +448 * Checks whether there is any invalid family name in HFiles to be bulk loaded. +449