Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5141B18672 for ; Sun, 29 Nov 2015 21:17:21 +0000 (UTC) Received: (qmail 12201 invoked by uid 500); 29 Nov 2015 21:17:16 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 12115 invoked by uid 500); 29 Nov 2015 21:17:16 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 10615 invoked by uid 99); 29 Nov 2015 21:17:15 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 29 Nov 2015 21:17:15 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 136AFE098F; Sun, 29 Nov 2015 21:17:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: misty@apache.org To: commits@hbase.apache.org Date: Sun, 29 Nov 2015 21:17:46 -0000 Message-Id: In-Reply-To: <9d71035b34ee40e9808f2706bbd8e144@git.apache.org> References: <9d71035b34ee40e9808f2706bbd8e144@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [33/51] [partial] hbase git commit: Published site at 3bac31b2a49bca153df3b47a198667828b61f36e. http://git-wip-us.apache.org/repos/asf/hbase/blob/ef7355e1/devapidocs/src-html/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.LoadQueueItem.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.LoadQueueItem.html b/devapidocs/src-html/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.LoadQueueItem.html index 7b73bcb..2eb4156 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.LoadQueueItem.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.LoadQueueItem.html @@ -529,487 +529,486 @@ 521 } 522 523 /** -524 * @return A Multimap<startkey, LoadQueueItem> that groups LQI by likely -525 * bulk load region targets. -526 */ -527 private Multimap<ByteBuffer, LoadQueueItem> groupOrSplitPhase(final Table table, -528 ExecutorService pool, Deque<LoadQueueItem> queue, -529 final Pair<byte[][], byte[][]> startEndKeys) throws IOException { -530 // <region start key, LQI> need synchronized only within this scope of this -531 // phase because of the puts that happen in futures. -532 Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create(); -533 final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs); -534 -535 // drain LQIs and figure out bulk load groups -536 Set<Future<List<LoadQueueItem>>> splittingFutures = new HashSet<Future<List<LoadQueueItem>>>(); -537 while (!queue.isEmpty()) { -538 final LoadQueueItem item = queue.remove(); -539 -540 final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() { -541 @Override -542 public List<LoadQueueItem> call() throws Exception { -543 List<LoadQueueItem> splits = groupOrSplit(regionGroups, item, table, startEndKeys); -544 return splits; -545 } -546 }; -547 splittingFutures.add(pool.submit(call)); -548 } -549 // get all the results. All grouping and splitting must finish before -550 // we can attempt the atomic loads. -551 for (Future<List<LoadQueueItem>> lqis : splittingFutures) { -552 try { -553 List<LoadQueueItem> splits = lqis.get(); -554 if (splits != null) { -555 queue.addAll(splits); -556 } -557 } catch (ExecutionException e1) { -558 Throwable t = e1.getCause(); -559 if (t instanceof IOException) { -560 LOG.error("IOException during splitting", e1); -561 throw (IOException)t; // would have been thrown if not parallelized, -562 } -563 LOG.error("Unexpected execution exception during splitting", e1); -564 throw new IllegalStateException(t); -565 } catch (InterruptedException e1) { -566 LOG.error("Unexpected interrupted exception during splitting", e1); -567 throw (InterruptedIOException)new InterruptedIOException().initCause(e1); -568 } -569 } -570 return regionGroups; -571 } -572 -573 // unique file name for the table -574 private String getUniqueName() { -575 return UUID.randomUUID().toString().replaceAll("-", ""); -576 } -577 -578 protected List<LoadQueueItem> splitStoreFile(final LoadQueueItem item, -579 final Table table, byte[] startKey, -580 byte[] splitKey) throws IOException { -581 final Path hfilePath = item.hfilePath; -582 -583 // We use a '_' prefix which is ignored when walking directory trees -584 // above. -585 final String TMP_DIR = "_tmp"; -586 Path tmpDir = item.hfilePath.getParent(); -587 if (!tmpDir.getName().equals(TMP_DIR)) { -588 tmpDir = new Path(tmpDir, TMP_DIR); -589 } -590 -591 LOG.info("HFile at " + hfilePath + " no longer fits inside a single " + -592 "region. Splitting..."); -593 -594 String uniqueName = getUniqueName(); -595 HColumnDescriptor familyDesc = table.getTableDescriptor().getFamily(item.family); -596 Path botOut = new Path(tmpDir, uniqueName + ".bottom"); -597 Path topOut = new Path(tmpDir, uniqueName + ".top"); -598 splitStoreFile(getConf(), hfilePath, familyDesc, splitKey, -599 botOut, topOut); -600 -601 FileSystem fs = tmpDir.getFileSystem(getConf()); -602 fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx")); -603 fs.setPermission(botOut, FsPermission.valueOf("-rwxrwxrwx")); -604 fs.setPermission(topOut, FsPermission.valueOf("-rwxrwxrwx")); -605 -606 // Add these back at the *front* of the queue, so there's a lower -607 // chance that the region will just split again before we get there. -608 List<LoadQueueItem> lqis = new ArrayList<LoadQueueItem>(2); -609 lqis.add(new LoadQueueItem(item.family, botOut)); -610 lqis.add(new LoadQueueItem(item.family, topOut)); -611 -612 LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut); -613 return lqis; -614 } -615 -616 /** -617 * Attempt to assign the given load queue item into its target region group. -618 * If the hfile boundary no longer fits into a region, physically splits -619 * the hfile such that the new bottom half will fit and returns the list of -620 * LQI's corresponding to the resultant hfiles. -621 * -622 * protected for testing -623 * @throws IOException -624 */ -625 protected List<LoadQueueItem> groupOrSplit(Multimap<ByteBuffer, LoadQueueItem> regionGroups, -626 final LoadQueueItem item, final Table table, -627 final Pair<byte[][], byte[][]> startEndKeys) -628 throws IOException { -629 final Path hfilePath = item.hfilePath; -630 HFile.Reader hfr = HFile.createReader(fs, hfilePath, -631 new CacheConfig(getConf()), getConf()); -632 final byte[] first, last; -633 try { -634 hfr.loadFileInfo(); -635 first = hfr.getFirstRowKey(); -636 last = hfr.getLastRowKey(); -637 } finally { -638 hfr.close(); -639 } -640 -641 LOG.info("Trying to load hfile=" + hfilePath + -642 " first=" + Bytes.toStringBinary(first) + -643 " last=" + Bytes.toStringBinary(last)); -644 if (first == null || last == null) { -645 assert first == null && last == null; -646 // TODO what if this is due to a bad HFile? -647 LOG.info("hfile " + hfilePath + " has no entries, skipping"); -648 return null; -649 } -650 if (Bytes.compareTo(first, last) > 0) { -651 throw new IllegalArgumentException( -652 "Invalid range: " + Bytes.toStringBinary(first) + -653 " > " + Bytes.toStringBinary(last)); -654 } -655 int idx = Arrays.binarySearch(startEndKeys.getFirst(), first, -656 Bytes.BYTES_COMPARATOR); -657 if (idx < 0) { -658 // not on boundary, returns -(insertion index). Calculate region it -659 // would be in. -660 idx = -(idx + 1) - 1; -661 } -662 final int indexForCallable = idx; -663 -664 /** -665 * we can consider there is a region hole in following conditions. 1) if idx < 0,then first -666 * region info is lost. 2) if the endkey of a region is not equal to the startkey of the next -667 * region. 3) if the endkey of the last region is not empty. -668 */ -669 if (indexForCallable < 0) { -670 throw new IOException("The first region info for table " -671 + table.getName() -672 + " cann't be found in hbase:meta.Please use hbck tool to fix it first."); -673 } else if ((indexForCallable == startEndKeys.getFirst().length - 1) -674 && !Bytes.equals(startEndKeys.getSecond()[indexForCallable], HConstants.EMPTY_BYTE_ARRAY)) { -675 throw new IOException("The last region info for table " -676 + table.getName() -677 + " cann't be found in hbase:meta.Please use hbck tool to fix it first."); -678 } else if (indexForCallable + 1 < startEndKeys.getFirst().length -679 && !(Bytes.compareTo(startEndKeys.getSecond()[indexForCallable], -680 startEndKeys.getFirst()[indexForCallable + 1]) == 0)) { -681 throw new IOException("The endkey of one region for table " -682 + table.getName() -683 + " is not equal to the startkey of the next region in hbase:meta." -684 + "Please use hbck tool to fix it first."); -685 } -686 -687 boolean lastKeyInRange = -688 Bytes.compareTo(last, startEndKeys.getSecond()[idx]) < 0 || -689 Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY); -690 if (!lastKeyInRange) { -691 List<LoadQueueItem> lqis = splitStoreFile(item, table, -692 startEndKeys.getFirst()[indexForCallable], -693 startEndKeys.getSecond()[indexForCallable]); -694 return lqis; -695 } -696 -697 // group regions. -698 regionGroups.put(ByteBuffer.wrap(startEndKeys.getFirst()[idx]), item); -699 return null; -700 } -701 -702 /** -703 * Attempts to do an atomic load of many hfiles into a region. If it fails, -704 * it returns a list of hfiles that need to be retried. If it is successful -705 * it will return an empty list. -706 * -707 * NOTE: To maintain row atomicity guarantees, region server callable should -708 * succeed atomically and fails atomically. -709 * -710 * Protected for testing. -711 * -712 * @return empty list if success, list of items to retry on recoverable -713 * failure -714 */ -715 protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn, -716 final TableName tableName, final byte[] first, Collection<LoadQueueItem> lqis) -717 throws IOException { -718 final List<Pair<byte[], String>> famPaths = -719 new ArrayList<Pair<byte[], String>>(lqis.size()); -720 for (LoadQueueItem lqi : lqis) { -721 famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString())); -722 } -723 -724 final RegionServerCallable<Boolean> svrCallable = -725 new RegionServerCallable<Boolean>(conn, tableName, first) { -726 @Override -727 public Boolean call(int callTimeout) throws Exception { -728 SecureBulkLoadClient secureClient = null; -729 boolean success = false; -730 -731 try { -732 LOG.debug("Going to connect to server " + getLocation() + " for row " -733 + Bytes.toStringBinary(getRow()) + " with hfile group " + famPaths); -734 byte[] regionName = getLocation().getRegionInfo().getRegionName(); -735 if (!isSecureBulkLoadEndpointAvailable()) { -736 success = ProtobufUtil.bulkLoadHFile(getStub(), famPaths, regionName, assignSeqIds); -737 } else { -738 try (Table table = conn.getTable(getTableName())) { -739 secureClient = new SecureBulkLoadClient(table); -740 success = secureClient.bulkLoadHFiles(famPaths, fsDelegationToken.getUserToken(), -741 bulkToken, getLocation().getRegionInfo().getStartKey()); -742 } -743 } -744 return success; -745 } finally { -746 //Best effort copying of files that might not have been imported -747 //from the staging directory back to original location -748 //in user directory -749 if(secureClient != null && !success) { -750 FileSystem targetFs = FileSystem.get(getConf()); -751 // Check to see if the source and target filesystems are the same -752 // If they are the same filesystem, we will try move the files back -753 // because previously we moved them to the staging directory. -754 if (FSHDFSUtils.isSameHdfs(getConf(), fs, targetFs)) { -755 for(Pair<byte[], String> el : famPaths) { -756 Path hfileStagingPath = null; -757 Path hfileOrigPath = new Path(el.getSecond()); -758 try { -759 hfileStagingPath= new Path(secureClient.getStagingPath(bulkToken, el.getFirst()), -760 hfileOrigPath.getName()); -761 if(targetFs.rename(hfileStagingPath, hfileOrigPath)) { -762 LOG.debug("Moved back file " + hfileOrigPath + " from " + -763 hfileStagingPath); -764 } else if(targetFs.exists(hfileStagingPath)){ -765 LOG.debug("Unable to move back file " + hfileOrigPath + " from " + -766 hfileStagingPath); -767 } -768 } catch(Exception ex) { -769 LOG.debug("Unable to move back file " + hfileOrigPath + " from " + -770 hfileStagingPath, ex); -771 } -772 } -773 } -774 } -775 } -776 } -777 }; -778 -779 try { -780 List<LoadQueueItem> toRetry = new ArrayList<LoadQueueItem>(); -781 Configuration conf = getConf(); -782 boolean success = RpcRetryingCallerFactory.instantiate(conf, -783 null).<Boolean> newCaller() -784 .callWithRetries(svrCallable, Integer.MAX_VALUE); -785 if (!success) { -786 LOG.warn("Attempt to bulk load region containing " -787 + Bytes.toStringBinary(first) + " into table " -788 + tableName + " with files " + lqis -789 + " failed. This is recoverable and they will be retried."); -790 toRetry.addAll(lqis); // return lqi's to retry -791 } -792 // success -793 return toRetry; -794 } catch (IOException e) { -795 LOG.error("Encountered unrecoverable error from region server, additional details: " -796 + svrCallable.getExceptionMessageAdditionalDetail(), e); -797 throw e; -798 } -799 } -800 -801 private boolean isSecureBulkLoadEndpointAvailable() { -802 String classes = getConf().get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); -803 return classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint"); -804 } -805 -806 /** -807 * Split a storefile into a top and bottom half, maintaining -808 * the metadata, recreating bloom filters, etc. -809 */ -810 static void splitStoreFile( -811 Configuration conf, Path inFile, -812 HColumnDescriptor familyDesc, byte[] splitKey, -813 Path bottomOut, Path topOut) throws IOException -814 { -815 // Open reader with no block cache, and not in-memory -816 Reference topReference = Reference.createTopReference(splitKey); -817 Reference bottomReference = Reference.createBottomReference(splitKey); -818 -819 copyHFileHalf(conf, inFile, topOut, topReference, familyDesc); -820 copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc); -821 } -822 -823 /** -824 * Copy half of an HFile into a new HFile. -825 */ -826 private static void copyHFileHalf( -827 Configuration conf, Path inFile, Path outFile, Reference reference, -828 HColumnDescriptor familyDescriptor) -829 throws IOException { -830 FileSystem fs = inFile.getFileSystem(conf); -831 CacheConfig cacheConf = new CacheConfig(conf); -832 HalfStoreFileReader halfReader = null; -833 StoreFile.Writer halfWriter = null; -834 try { -835 halfReader = new HalfStoreFileReader(fs, inFile, cacheConf, reference, conf); -836 Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo(); -837 -838 int blocksize = familyDescriptor.getBlocksize(); -839 Algorithm compression = familyDescriptor.getCompressionType(); -840 BloomType bloomFilterType = familyDescriptor.getBloomFilterType(); -841 HFileContext hFileContext = new HFileContextBuilder() -842 .withCompression(compression) -843 .withChecksumType(HStore.getChecksumType(conf)) -844 .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)) -845 .withBlockSize(blocksize) -846 .withDataBlockEncoding(familyDescriptor.getDataBlockEncoding()) -847 .build(); -848 halfWriter = new StoreFile.WriterBuilder(conf, cacheConf, -849 fs) -850 .withFilePath(outFile) -851 .withBloomType(bloomFilterType) -852 .withFileContext(hFileContext) -853 .build(); -854 HFileScanner scanner = halfReader.getScanner(false, false, false); -855 scanner.seekTo(); -856 do { -857 halfWriter.append(scanner.getCell()); -858 } while (scanner.next()); -859 -860 for (Map.Entry<byte[],byte[]> entry : fileInfo.entrySet()) { -861 if (shouldCopyHFileMetaKey(entry.getKey())) { -862 halfWriter.appendFileInfo(entry.getKey(), entry.getValue()); -863 } -864 } -865 } finally { -866 if (halfWriter != null) halfWriter.close(); -867 if (halfReader != null) halfReader.close(cacheConf.shouldEvictOnClose()); -868 } -869 } -870 -871 private static boolean shouldCopyHFileMetaKey(byte[] key) { -872 return !HFile.isReservedFileInfoKey(key); -873 } -874 -875 /* -876 * Infers region boundaries for a new table. -877 * Parameter: -878 * bdryMap is a map between keys to an integer belonging to {+1, -1} -879 * If a key is a start key of a file, then it maps to +1 -880 * If a key is an end key of a file, then it maps to -1 -881 * Algo: -882 * 1) Poll on the keys in order: -883 * a) Keep adding the mapped values to these keys (runningSum) -884 * b) Each time runningSum reaches 0, add the start Key from when the runningSum had started to -885 * a boundary list. -886 * 2) Return the boundary list. -887 */ -888 public static byte[][] inferBoundaries(TreeMap<byte[], Integer> bdryMap) { -889 ArrayList<byte[]> keysArray = new ArrayList<byte[]>(); -890 int runningValue = 0; -891 byte[] currStartKey = null; -892 boolean firstBoundary = true; -893 -894 for (Map.Entry<byte[], Integer> item: bdryMap.entrySet()) { -895 if (runningValue == 0) currStartKey = item.getKey(); -896 runningValue += item.getValue(); -897 if (runningValue == 0) { -898 if (!firstBoundary) keysArray.add(currStartKey); -899 firstBoundary = false; -900 } -901 } -902 -903 return keysArray.toArray(new byte[0][0]); -904 } -905 -906 /* -907 * If the table is created for the first time, then "completebulkload" reads the files twice. -908 * More modifications necessary if we want to avoid doing it. -909 */ -910 private void createTable(TableName tableName, String dirPath, Admin admin) throws Exception { -911 final Path hfofDir = new Path(dirPath); -912 final FileSystem fs = hfofDir.getFileSystem(getConf()); -913 -914 // Add column families -915 // Build a set of keys -916 final HTableDescriptor htd = new HTableDescriptor(tableName); -917 final TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); -918 visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<HColumnDescriptor>() { -919 @Override -920 public HColumnDescriptor bulkFamily(final byte[] familyName) { -921 HColumnDescriptor hcd = new HColumnDescriptor(familyName); -922 htd.addFamily(hcd); -923 return hcd; -924 } -925 @Override -926 public void bulkHFile(final HColumnDescriptor hcd, final FileStatus hfileStatus) -927 throws IOException { -928 Path hfile = hfileStatus.getPath(); -929 HFile.Reader reader = HFile.createReader(fs, hfile, -930 new CacheConfig(getConf()), getConf()); -931 try { -932 if (hcd.getCompressionType() != reader.getFileContext().getCompression()) { -933 hcd.setCompressionType(reader.getFileContext().getCompression()); -934 LOG.info("Setting compression " + hcd.getCompressionType().name() + -935 " for family " + hcd.toString()); -936 } -937 reader.loadFileInfo(); -938 byte[] first = reader.getFirstRowKey(); -939 byte[] last = reader.getLastRowKey(); -940 -941 LOG.info("Trying to figure out region boundaries hfile=" + hfile + -942 " first=" + Bytes.toStringBinary(first) + -943 " last=" + Bytes.toStringBinary(last)); -944 -945 // To eventually infer start key-end key boundaries -946 Integer value = map.containsKey(first)? map.get(first):0; -947 map.put(first, value+1); -948 -949 value = map.containsKey(last)? map.get(last):0; -950 map.put(last, value-1); -951 } finally { -952 reader.close(); -953 } -954 } -955 }); -956 -957 byte[][] keys = LoadIncrementalHFiles.inferBoundaries(map); -958 admin.createTable(htd, keys); -959 -960 LOG.info("Table "+ tableName +" is available!!"); -961 } -962 -963 @Override -964 public int run(String[] args) throws Exception { -965 if (args.length != 2) { -966 usage(); -967 return -1; -968 } -969 -970 initialize(); -971 try (Connection connection = ConnectionFactory.createConnection(getConf()); -972 Admin admin = connection.getAdmin()) { -973 String dirPath = args[0]; -974 TableName tableName = TableName.valueOf(args[1]); -975 -976 boolean tableExists = admin.tableExists(tableName); -977 if (!tableExists) { -978 if ("yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, "yes"))) { -979 this.createTable(tableName, dirPath, admin); -980 } else { -981 String errorMsg = format("Table '%s' does not exist.", tableName); -982 LOG.error(errorMsg); -983 throw new TableNotFoundException(errorMsg); -984 } -985 } -986 -987 Path hfofDir = new Path(dirPath); -988 -989 try (Table table = connection.getTable(tableName); -990 RegionLocator locator = connection.getRegionLocator(tableName)) { -991 doBulkLoad(hfofDir, admin, table, locator); -992 } -993 } -994 -995 return 0; -996 } -997 -998 public static void main(String[] args) throws Exception { -999 Configuration conf = HBaseConfiguration.create(); -1000 int ret = ToolRunner.run(conf, new LoadIncrementalHFiles(), args); -1001 System.exit(ret); -1002 } -1003 -1004} +524 * @return A map that groups LQI by likely bulk load region targets. +525 */ +526 private Multimap<ByteBuffer, LoadQueueItem> groupOrSplitPhase(final Table table, +527 ExecutorService pool, Deque<LoadQueueItem> queue, +528 final Pair<byte[][], byte[][]> startEndKeys) throws IOException { +529 // <region start key, LQI> need synchronized only within this scope of this +530 // phase because of the puts that happen in futures. +531 Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create(); +532 final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs); +533 +534 // drain LQIs and figure out bulk load groups +535 Set<Future<List<LoadQueueItem>>> splittingFutures = new HashSet<Future<List<LoadQueueItem>>>(); +536 while (!queue.isEmpty()) { +537 final LoadQueueItem item = queue.remove(); +538 +539 final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() { +540 @Override +541 public List<LoadQueueItem> call() throws Exception { +542 List<LoadQueueItem> splits = groupOrSplit(regionGroups, item, table, startEndKeys); +543 return splits; +544 } +545 }; +546 splittingFutures.add(pool.submit(call)); +547 } +548 // get all the results. All grouping and splitting must finish before +549 // we can attempt the atomic loads. +550 for (Future<List<LoadQueueItem>> lqis : splittingFutures) { +551 try { +552 List<LoadQueueItem> splits = lqis.get(); +553 if (splits != null) { +554 queue.addAll(splits); +555 } +556 } catch (ExecutionException e1) { +557 Throwable t = e1.getCause(); +558 if (t instanceof IOException) { +559 LOG.error("IOException during splitting", e1); +560 throw (IOException)t; // would have been thrown if not parallelized, +561 } +562 LOG.error("Unexpected execution exception during splitting", e1); +563 throw new IllegalStateException(t); +564 } catch (InterruptedException e1) { +565 LOG.error("Unexpected interrupted exception during splitting", e1); +566 throw (InterruptedIOException)new InterruptedIOException().initCause(e1); +567 } +568 } +569 return regionGroups; +570 } +571 +572 // unique file name for the table +573 private String getUniqueName() { +574 return UUID.randomUUID().toString().replaceAll("-", ""); +575 } +576 +577 protected List<LoadQueueItem> splitStoreFile(final LoadQueueItem item, +578 final Table table, byte[] startKey, +579 byte[] splitKey) throws IOException { +580 final Path hfilePath = item.hfilePath; +581 +582 // We use a '_' prefix which is ignored when walking directory trees +583 // above. +584 final String TMP_DIR = "_tmp"; +585 Path tmpDir = item.hfilePath.getParent(); +586 if (!tmpDir.getName().equals(TMP_DIR)) { +587 tmpDir = new Path(tmpDir, TMP_DIR); +588 } +589 +590 LOG.info("HFile at " + hfilePath + " no longer fits inside a single " + +591 "region. Splitting..."); +592 +593 String uniqueName = getUniqueName(); +594 HColumnDescriptor familyDesc = table.getTableDescriptor().getFamily(item.family); +595 Path botOut = new Path(tmpDir, uniqueName + ".bottom"); +596 Path topOut = new Path(tmpDir, uniqueName + ".top"); +597 splitStoreFile(getConf(), hfilePath, familyDesc, splitKey, +598 botOut, topOut); +599 +600 FileSystem fs = tmpDir.getFileSystem(getConf()); +601 fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx")); +602 fs.setPermission(botOut, FsPermission.valueOf("-rwxrwxrwx")); +603 fs.setPermission(topOut, FsPermission.valueOf("-rwxrwxrwx")); +604 +605 // Add these back at the *front* of the queue, so there's a lower +606 // chance that the region will just split again before we get there. +607 List<LoadQueueItem> lqis = new ArrayList<LoadQueueItem>(2); +608 lqis.add(new LoadQueueItem(item.family, botOut)); +609 lqis.add(new LoadQueueItem(item.family, topOut)); +610 +611 LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut); +612 return lqis; +613 } +614 +615 /** +616 * Attempt to assign the given load queue item into its target region group. +617 * If the hfile boundary no longer fits into a region, physically splits +618 * the hfile such that the new bottom half will fit and returns the list of +619 * LQI's corresponding to the resultant hfiles. +620 * +621 * protected for testing +622 * @throws IOException +623 */ +624 protected List<LoadQueueItem> groupOrSplit(Multimap<ByteBuffer, LoadQueueItem> regionGroups, +625 final LoadQueueItem item, final Table table, +626 final Pair<byte[][], byte[][]> startEndKeys) +627 throws IOException { +628 final Path hfilePath = item.hfilePath; +629 HFile.Reader hfr = HFile.createReader(fs, hfilePath, +630 new CacheConfig(getConf()), getConf()); +631 final byte[] first, last; +632 try { +633 hfr.loadFileInfo(); +634 first = hfr.getFirstRowKey(); +635 last = hfr.getLastRowKey(); +636 } finally { +637 hfr.close(); +638 } +639 +640 LOG.info("Trying to load hfile=" + hfilePath + +641 " first=" + Bytes.toStringBinary(first) + +642 " last=" + Bytes.toStringBinary(last)); +643 if (first == null || last == null) { +644 assert first == null && last == null; +645 // TODO what if this is due to a bad HFile? +646 LOG.info("hfile " + hfilePath + " has no entries, skipping"); +647 return null; +648 } +649 if (Bytes.compareTo(first, last) > 0) { +650 throw new IllegalArgumentException( +651 "Invalid range: " + Bytes.toStringBinary(first) + +652 " > " + Bytes.toStringBinary(last)); +653 } +654 int idx = Arrays.binarySearch(startEndKeys.getFirst(), first, +655 Bytes.BYTES_COMPARATOR); +656 if (idx < 0) { +657 // not on boundary, returns -(insertion index). Calculate region it +658 // would be in. +659 idx = -(idx + 1) - 1; +660 } +661 final int indexForCallable = idx; +662 +663 /** +664 * we can consider there is a region hole in following conditions. 1) if idx < 0,then first +665 * region info is lost. 2) if the endkey of a region is not equal to the startkey of the next +666 * region. 3) if the endkey of the last region is not empty. +667 */ +668 if (indexForCallable < 0) { +669 throw new IOException("The first region info for table " +670 + table.getName() +671 + " cann't be found in hbase:meta.Please use hbck tool to fix it first."); +672 } else if ((indexForCallable == startEndKeys.getFirst().length - 1) +673 && !Bytes.equals(startEndKeys.getSecond()[indexForCallable], HConstants.EMPTY_BYTE_ARRAY)) { +674 throw new IOException("The last region info for table " +675 + table.getName() +676 + " cann't be found in hbase:meta.Please use hbck tool to fix it first."); +677 } else if (indexForCallable + 1 < startEndKeys.getFirst().length +678 && !(Bytes.compareTo(startEndKeys.getSecond()[indexForCallable], +679 startEndKeys.getFirst()[indexForCallable + 1]) == 0)) { +680 throw new IOException("The endkey of one region for table " +681 + table.getName() +682 + " is not equal to the startkey of the next region in hbase:meta." +683 + "Please use hbck tool to fix it first."); +684 } +685 +686 boolean lastKeyInRange = +687 Bytes.compareTo(last, startEndKeys.getSecond()[idx]) < 0 || +688 Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY); +689 if (!lastKeyInRange) { +690 List<LoadQueueItem> lqis = splitStoreFile(item, table, +691 startEndKeys.getFirst()[indexForCallable], +692 startEndKeys.getSecond()[indexForCallable]); +693 return lqis; +694 } +695 +696 // group regions. +697 regionGroups.put(ByteBuffer.wrap(startEndKeys.getFirst()[idx]), item); +698 return null; +699 } +700 +701 /** +702 * Attempts to do an atomic load of many hfiles into a region. If it fails, +703 * it returns a list of hfiles that need to be retried. If it is successful +704 * it will return an empty list. +705 * +706 * NOTE: To maintain row atomicity guarantees, region server callable should +707 * succeed atomically and fails atomically. +708 * +709 * Protected for testing. +710 * +711 * @return empty list if success, list of items to retry on recoverable +712 * failure +713 */ +714 protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn, +715 final TableName tableName, final byte[] first, Collection<LoadQueueItem> lqis) +716 throws IOException { +717 final List<Pair<byte[], String>> famPaths = +718 new ArrayList<Pair<byte[], String>>(lqis.size()); +719 for (LoadQueueItem lqi : lqis) { +720 famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString())); +721 } +722 +723 final RegionServerCallable<Boolean> svrCallable = +724 new RegionServerCallable<Boolean>(conn, tableName, first) { +725 @Override +726 public Boolean call(int callTimeout) throws Exception { +727 SecureBulkLoadClient secureClient = null; +728 boolean success = false; +729 +730 try { +731 LOG.debug("Going to connect to server " + getLocation() + " for row " +732 + Bytes.toStringBinary(getRow()) + " with hfile group " + famPaths); +733 byte[] regionName = getLocation().getRegionInfo().getRegionName(); +734 if (!isSecureBulkLoadEndpointAvailable()) { +735 success = ProtobufUtil.bulkLoadHFile(getStub(), famPaths, regionName, assignSeqIds); +736 } else { +737 try (Table table = conn.getTable(getTableName())) { +738 secureClient = new SecureBulkLoadClient(table); +739 success = secureClient.bulkLoadHFiles(famPaths, fsDelegationToken.getUserToken(), +740 bulkToken, getLocation().getRegionInfo().getStartKey()); +741 } +742 } +743 return success; +744 } finally { +745 //Best effort copying of files that might not have been imported +746 //from the staging directory back to original location +747 //in user directory +748 if(secureClient != null && !success) { +749 FileSystem targetFs = FileSystem.get(getConf()); +750 // Check to see if the source and target filesystems are the same +751 // If they are the same filesystem, we will try move the files back +752 // because previously we moved them to the staging directory. +753 if (FSHDFSUtils.isSameHdfs(getConf(), fs, targetFs)) { +754 for(Pair<byte[], String> el : famPaths) { +755 Path hfileStagingPath = null; +756 Path hfileOrigPath = new Path(el.getSecond()); +757 try { +758 hfileStagingPath= new Path(secureClient.getStagingPath(bulkToken, el.getFirst()), +759 hfileOrigPath.getName()); +760 if(targetFs.rename(hfileStagingPath, hfileOrigPath)) { +761 LOG.debug("Moved back file " + hfileOrigPath + " from " + +762 hfileStagingPath); +763 } else if(targetFs.exists(hfileStagingPath)){ +764 LOG.debug("Unable to move back file " + hfileOrigPath + " from " + +765 hfileStagingPath); +766 } +767 } catch(Exception ex) { +768 LOG.debug("Unable to move back file " + hfileOrigPath + " from " + +769 hfileStagingPath, ex); +770 } +771 } +772 } +773 } +774 } +775 } +776 }; +777 +778 try { +779 List<LoadQueueItem> toRetry = new ArrayList<LoadQueueItem>(); +780 Configuration conf = getConf(); +781 boolean success = RpcRetryingCallerFactory.instantiate(conf, +782 null).<Boolean> newCaller() +783 .callWithRetries(svrCallable, Integer.MAX_VALUE); +784 if (!success) { +785 LOG.warn("Attempt to bulk load region containing " +786 + Bytes.toStringBinary(first) + " into table " +787 + tableName + " with files " + lqis +788 + " failed. This is recoverable and they will be retried."); +789 toRetry.addAll(lqis); // return lqi's to retry +790 } +791 // success +792 return toRetry; +793 } catch (IOException e) { +794 LOG.error("Encountered unrecoverable error from region server, additional details: " +795 + svrCallable.getExceptionMessageAdditionalDetail(), e); +796 throw e; +797 } +798 } +799 +800 private boolean isSecureBulkLoadEndpointAvailable() { +801 String classes = getConf().get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); +802 return classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint"); +803 } +804 +805 /** +806 * Split a storefile into a top and bottom half, maintaining +807 * the metadata, recreating bloom filters, etc. +808 */ +809 static void splitStoreFile( +810 Configuration conf, Path inFile, +811 HColumnDescriptor familyDesc, byte[] splitKey, +812 Path bottomOut, Path topOut) throws IOException +813 { +814 // Open reader with no block cache, and not in-memory +815 Reference topReference = Reference.createTopReference(splitKey); +816 Reference bottomReference = Reference.createBottomReference(splitKey); +817 +818 copyHFileHalf(conf, inFile, topOut, topReference, familyDesc); +819 copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc); +820 } +821 +822 /** +823 * Copy half of an HFile into a new HFile. +824 */ +825 private static void copyHFileHalf( +826 Configuration conf, Path inFile, Path outFile, Reference reference, +827 HColumnDescriptor familyDescriptor) +828 throws IOException { +829 FileSystem fs = inFile.getFileSystem(conf); +830 CacheConfig cacheConf = new CacheConfig(conf); +831 HalfStoreFileReader halfReader = null; +832 StoreFile.Writer halfWriter = null; +833 try { +834 halfReader = new HalfStoreFileReader(fs, inFile, cacheConf, reference, conf); +835 Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo(); +836 +837 int blocksize = familyDescriptor.getBlocksize(); +838 Algorithm compression = familyDescriptor.getCompressionType(); +839 BloomType bloomFilterType = familyDescriptor.getBloomFilterType(); +840 HFileContext hFileContext = new HFileContextBuilder() +841 .withCompression(compression) +842 .withChecksumType(HStore.getChecksumType(conf)) +843 .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)) +844 .withBlockSize(blocksize) +845 .withDataBlockEncoding(familyDescriptor.getDataBlockEncoding()) +846 .build(); +847 halfWriter = new StoreFile.WriterBuilder(conf, cacheConf, +848 fs) +849 .withFilePath(outFile) +850 .withBloomType(bloomFilterType) +851 .withFileContext(hFileContext) +852 .build(); +853 HFileScanner scanner = halfReader.getScanner(false, false, false); +854 scanner.seekTo(); +855 do { +856 halfWriter.append(scanner.getCell()); +857 } while (scanner.next()); +858 +859 for (Map.Entry<byte[],byte[]> entry : fileInfo.entrySet()) { +860 if (shouldCopyHFileMetaKey(entry.getKey())) { +861 halfWriter.appendFileInfo(entry.getKey(), entry.getValue()); +862 } +863 } +864 } finally { +865 if (halfWriter != null) halfWriter.close(); +866 if (halfReader != null) halfReader.close(cacheConf.shouldEvictOnClose()); +867 } +868 } +869 +870 private static boolean shouldCopyHFileMetaKey(byte[] key) { +871 return !HFile.isReservedFileInfoKey(key); +872 } +873 +874 /* +875 * Infers region boundaries for a new table. +876 * Parameter: +877 * bdryMap is a map between keys to an integer belonging to {+1, -1} +878 * If a key is a start key of a file, then it maps to +1 +879 * If a key is an end key of a file, then it maps to -1 +880 * Algo: +881 * 1) Poll on the keys in order: +882 * a) Keep adding the mapped values to these keys (runningSum) +883 * b) Each time runningSum reaches 0, add the start Key from when the runningSum had started to +884 * a boundary list. +885 * 2) Return the boundary list. +886 */ +887 public static byte[][] inferBoundaries(TreeMap<byte[], Integer> bdryMap) { +888 ArrayList<byte[]> keysArray = new ArrayList<byte[]>(); +889 int runningValue = 0; +890 byte[] currStartKey = null; +891 boolean firstBoundary = true; +892 +893 for (Map.Entry<byte[], Integer> item: bdryMap.entrySet()) { +894 if (runningValue == 0) currStartKey = item.getKey(); +895 runningValue += item.getValue(); +896 if (runningValue == 0) { +897 if (!firstBoundary) keysArray.add(currStartKey); +898 firstBoundary = false; +899 } +900 } +901 +902 return keysArray.toArray(new byte[0][0]);