Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E045F200D4F for ; Tue, 31 Oct 2017 16:16:58 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id DEB73160C08; Tue, 31 Oct 2017 15:16:58 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9C0A31609EC for ; Tue, 31 Oct 2017 16:16:54 +0100 (CET) Received: (qmail 37296 invoked by uid 500); 31 Oct 2017 15:16:53 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 36491 invoked by uid 99); 31 Oct 2017 15:16:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 31 Oct 2017 15:16:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1DB84E0779; Tue, 31 Oct 2017 15:16:50 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Tue, 31 Oct 2017 15:17:02 -0000 Message-Id: <9bdd75c44bd443e28bb9146342369e69@git.apache.org> In-Reply-To: <1125b91ba4af4b08b647bebab99dc5b6@git.apache.org> References: <1125b91ba4af4b08b647bebab99dc5b6@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [14/51] [partial] hbase-site git commit: Published site at . archived-at: Tue, 31 Oct 2017 15:16:59 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/35decbe4/devapidocs/src-html/org/apache/hadoop/hbase/snapshot/ExportSnapshot.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/snapshot/ExportSnapshot.html b/devapidocs/src-html/org/apache/hadoop/hbase/snapshot/ExportSnapshot.html index 87257da..add30e1 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/snapshot/ExportSnapshot.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/snapshot/ExportSnapshot.html @@ -116,1006 +116,1008 @@ 108 private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size"; 109 private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group"; 110 private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb"; -111 protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp"; -112 -113 static class Testing { -114 static final String CONF_TEST_FAILURE = "test.snapshot.export.failure"; -115 static final String CONF_TEST_FAILURE_COUNT = "test.snapshot.export.failure.count"; -116 int failuresCountToInject = 0; -117 int injectedFailureCount = 0; -118 } -119 -120 // Command line options and defaults. -121 static final class Options { -122 static final Option SNAPSHOT = new Option(null, "snapshot", true, "Snapshot to restore."); -123 static final Option TARGET_NAME = new Option(null, "target", true, -124 "Target name for the snapshot."); -125 static final Option COPY_TO = new Option(null, "copy-to", true, "Remote " -126 + "destination hdfs://"); -127 static final Option COPY_FROM = new Option(null, "copy-from", true, -128 "Input folder hdfs:// (default hbase.rootdir)"); -129 static final Option NO_CHECKSUM_VERIFY = new Option(null, "no-checksum-verify", false, -130 "Do not verify checksum, use name+length only."); -131 static final Option NO_TARGET_VERIFY = new Option(null, "no-target-verify", false, -132 "Do not verify the integrity of the exported snapshot."); -133 static final Option OVERWRITE = new Option(null, "overwrite", false, -134 "Rewrite the snapshot manifest if already exists."); -135 static final Option CHUSER = new Option(null, "chuser", true, -136 "Change the owner of the files to the specified one."); -137 static final Option CHGROUP = new Option(null, "chgroup", true, -138 "Change the group of the files to the specified one."); -139 static final Option CHMOD = new Option(null, "chmod", true, -140 "Change the permission of the files to the specified one."); -141 static final Option MAPPERS = new Option(null, "mappers", true, -142 "Number of mappers to use during the copy (mapreduce.job.maps)."); -143 static final Option BANDWIDTH = new Option(null, "bandwidth", true, -144 "Limit bandwidth to this value in MB/second."); -145 } -146 -147 // Export Map-Reduce Counters, to keep track of the progress -148 public enum Counter { -149 MISSING_FILES, FILES_COPIED, FILES_SKIPPED, COPY_FAILED, -150 BYTES_EXPECTED, BYTES_SKIPPED, BYTES_COPIED -151 } -152 -153 private static class ExportMapper extends Mapper<BytesWritable, NullWritable, -154 NullWritable, NullWritable> { -155 private static final Log LOG = LogFactory.getLog(ExportMapper.class); -156 final static int REPORT_SIZE = 1 * 1024 * 1024; -157 final static int BUFFER_SIZE = 64 * 1024; -158 -159 private boolean verifyChecksum; -160 private String filesGroup; -161 private String filesUser; -162 private short filesMode; -163 private int bufferSize; -164 -165 private FileSystem outputFs; -166 private Path outputArchive; -167 private Path outputRoot; -168 -169 private FileSystem inputFs; -170 private Path inputArchive; -171 private Path inputRoot; -172 -173 private static Testing testing = new Testing(); -174 -175 @Override -176 public void setup(Context context) throws IOException { -177 Configuration conf = context.getConfiguration(); -178 -179 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX); -180 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX); -181 -182 verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true); -183 -184 filesGroup = conf.get(CONF_FILES_GROUP); -185 filesUser = conf.get(CONF_FILES_USER); -186 filesMode = (short)conf.getInt(CONF_FILES_MODE, 0); -187 outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT)); -188 inputRoot = new Path(conf.get(CONF_INPUT_ROOT)); -189 -190 inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY); -191 outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY); -192 -193 try { -194 srcConf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true); -195 inputFs = FileSystem.get(inputRoot.toUri(), srcConf); -196 } catch (IOException e) { -197 throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e); -198 } -199 -200 try { -201 destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true); -202 outputFs = FileSystem.get(outputRoot.toUri(), destConf); -203 } catch (IOException e) { -204 throw new IOException("Could not get the output FileSystem with root="+ outputRoot, e); -205 } -206 -207 // Use the default block size of the outputFs if bigger -208 int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE); -209 bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize); -210 LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize)); -211 -212 for (Counter c : Counter.values()) { -213 context.getCounter(c).increment(0); -214 } -215 if (context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) { -216 testing.failuresCountToInject = conf.getInt(Testing.CONF_TEST_FAILURE_COUNT, 0); -217 // Get number of times we have already injected failure based on attempt number of this -218 // task. -219 testing.injectedFailureCount = context.getTaskAttemptID().getId(); -220 } -221 } -222 -223 @Override -224 protected void cleanup(Context context) { -225 IOUtils.closeStream(inputFs); -226 IOUtils.closeStream(outputFs); -227 } -228 -229 @Override -230 public void map(BytesWritable key, NullWritable value, Context context) -231 throws InterruptedException, IOException { -232 SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes()); -233 Path outputPath = getOutputPath(inputInfo); -234 -235 copyFile(context, inputInfo, outputPath); -236 } -237 -238 /** -239 * Returns the location where the inputPath will be copied. -240 */ -241 private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException { -242 Path path = null; -243 switch (inputInfo.getType()) { -244 case HFILE: -245 Path inputPath = new Path(inputInfo.getHfile()); -246 String family = inputPath.getParent().getName(); -247 TableName table =HFileLink.getReferencedTableName(inputPath.getName()); -248 String region = HFileLink.getReferencedRegionName(inputPath.getName()); -249 String hfile = HFileLink.getReferencedHFileName(inputPath.getName()); -250 path = new Path(FSUtils.getTableDir(new Path("./"), table), -251 new Path(region, new Path(family, hfile))); -252 break; -253 case WAL: -254 LOG.warn("snapshot does not keeps WALs: " + inputInfo); -255 break; -256 default: -257 throw new IOException("Invalid File Type: " + inputInfo.getType().toString()); -258 } -259 return new Path(outputArchive, path); -260 } -261 -262 /** -263 * Used by TestExportSnapshot to test for retries when failures happen. -264 * Failure is injected in {@link #copyFile(Context, SnapshotFileInfo, Path)}. -265 */ -266 private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo) -267 throws IOException { -268 if (!context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) return; -269 if (testing.injectedFailureCount >= testing.failuresCountToInject) return; -270 testing.injectedFailureCount++; -271 context.getCounter(Counter.COPY_FAILED).increment(1); -272 LOG.debug("Injecting failure. Count: " + testing.injectedFailureCount); -273 throw new IOException(String.format("TEST FAILURE (%d of max %d): Unable to copy input=%s", -274 testing.injectedFailureCount, testing.failuresCountToInject, inputInfo)); -275 } -276 -277 private void copyFile(final Context context, final SnapshotFileInfo inputInfo, -278 final Path outputPath) throws IOException { -279 // Get the file information -280 FileStatus inputStat = getSourceFileStatus(context, inputInfo); -281 -282 // Verify if the output file exists and is the same that we want to copy -283 if (outputFs.exists(outputPath)) { -284 FileStatus outputStat = outputFs.getFileStatus(outputPath); -285 if (outputStat != null && sameFile(inputStat, outputStat)) { -286 LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file."); -287 context.getCounter(Counter.FILES_SKIPPED).increment(1); -288 context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen()); -289 return; -290 } -291 } -292 -293 InputStream in = openSourceFile(context, inputInfo); -294 int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100); -295 if (Integer.MAX_VALUE != bandwidthMB) { -296 in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024L); -297 } -298 -299 try { -300 context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen()); -301 -302 // Ensure that the output folder is there and copy the file -303 createOutputPath(outputPath.getParent()); -304 FSDataOutputStream out = outputFs.create(outputPath, true); -305 try { -306 copyData(context, inputStat.getPath(), in, outputPath, out, inputStat.getLen()); -307 } finally { -308 out.close(); -309 } -310 -311 // Try to Preserve attributes -312 if (!preserveAttributes(outputPath, inputStat)) { -313 LOG.warn("You may have to run manually chown on: " + outputPath); -314 } -315 } finally { -316 in.close(); -317 injectTestFailure(context, inputInfo); -318 } -319 } -320 -321 /** -322 * Create the output folder and optionally set ownership. -323 */ -324 private void createOutputPath(final Path path) throws IOException { -325 if (filesUser == null && filesGroup == null) { -326 outputFs.mkdirs(path); -327 } else { -328 Path parent = path.getParent(); -329 if (!outputFs.exists(parent) && !parent.isRoot()) { -330 createOutputPath(parent); -331 } -332 outputFs.mkdirs(path); -333 if (filesUser != null || filesGroup != null) { -334 // override the owner when non-null user/group is specified -335 outputFs.setOwner(path, filesUser, filesGroup); -336 } -337 if (filesMode > 0) { -338 outputFs.setPermission(path, new FsPermission(filesMode)); -339 } -340 } -341 } -342 -343 /** -344 * Try to Preserve the files attribute selected by the user copying them from the source file -345 * This is only required when you are exporting as a different user than "hbase" or on a system -346 * that doesn't have the "hbase" user. -347 * -348 * This is not considered a blocking failure since the user can force a chmod with the user -349 * that knows is available on the system. -350 */ -351 private boolean preserveAttributes(final Path path, final FileStatus refStat) { -352 FileStatus stat; -353 try { -354 stat = outputFs.getFileStatus(path); -355 } catch (IOException e) { -356 LOG.warn("Unable to get the status for file=" + path); -357 return false; -358 } -359 -360 try { -361 if (filesMode > 0 && stat.getPermission().toShort() != filesMode) { -362 outputFs.setPermission(path, new FsPermission(filesMode)); -363 } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) { -364 outputFs.setPermission(path, refStat.getPermission()); -365 } -366 } catch (IOException e) { -367 LOG.warn("Unable to set the permission for file="+ stat.getPath() +": "+ e.getMessage()); -368 return false; -369 } -370 -371 boolean hasRefStat = (refStat != null); -372 String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner(); -373 String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup(); -374 if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) { -375 try { -376 if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) { -377 outputFs.setOwner(path, user, group); -378 } -379 } catch (IOException e) { -380 LOG.warn("Unable to set the owner/group for file="+ stat.getPath() +": "+ e.getMessage()); -381 LOG.warn("The user/group may not exist on the destination cluster: user=" + -382 user + " group=" + group); -383 return false; -384 } -385 } -386 -387 return true; -388 } -389 -390 private boolean stringIsNotEmpty(final String str) { -391 return str != null && str.length() > 0; -392 } -393 -394 private void copyData(final Context context, -395 final Path inputPath, final InputStream in, -396 final Path outputPath, final FSDataOutputStream out, -397 final long inputFileSize) -398 throws IOException { -399 final String statusMessage = "copied %s/" + StringUtils.humanReadableInt(inputFileSize) + -400 " (%.1f%%)"; -401 -402 try { -403 byte[] buffer = new byte[bufferSize]; -404 long totalBytesWritten = 0; -405 int reportBytes = 0; -406 int bytesRead; -407 -408 long stime = System.currentTimeMillis(); -409 while ((bytesRead = in.read(buffer)) > 0) { -410 out.write(buffer, 0, bytesRead); -411 totalBytesWritten += bytesRead; -412 reportBytes += bytesRead; -413 -414 if (reportBytes >= REPORT_SIZE) { -415 context.getCounter(Counter.BYTES_COPIED).increment(reportBytes); -416 context.setStatus(String.format(statusMessage, -417 StringUtils.humanReadableInt(totalBytesWritten), -418 (totalBytesWritten/(float)inputFileSize) * 100.0f) + -419 " from " + inputPath + " to " + outputPath); -420 reportBytes = 0; -421 } -422 } -423 long etime = System.currentTimeMillis(); -424 -425 context.getCounter(Counter.BYTES_COPIED).increment(reportBytes); -426 context.setStatus(String.format(statusMessage, -427 StringUtils.humanReadableInt(totalBytesWritten), -428 (totalBytesWritten/(float)inputFileSize) * 100.0f) + -429 " from " + inputPath + " to " + outputPath); -430 -431 // Verify that the written size match -432 if (totalBytesWritten != inputFileSize) { -433 String msg = "number of bytes copied not matching copied=" + totalBytesWritten + -434 " expected=" + inputFileSize + " for file=" + inputPath; -435 throw new IOException(msg); -436 } -437 -438 LOG.info("copy completed for input=" + inputPath + " output=" + outputPath); -439 LOG.info("size=" + totalBytesWritten + -440 " (" + StringUtils.humanReadableInt(totalBytesWritten) + ")" + -441 " time=" + StringUtils.formatTimeDiff(etime, stime) + -442 String.format(" %.3fM/sec", (totalBytesWritten / ((etime - stime)/1000.0))/1048576.0)); -443 context.getCounter(Counter.FILES_COPIED).increment(1); -444 } catch (IOException e) { -445 LOG.error("Error copying " + inputPath + " to " + outputPath, e); -446 context.getCounter(Counter.COPY_FAILED).increment(1); -447 throw e; -448 } -449 } -450 -451 /** -452 * Try to open the "source" file. -453 * Throws an IOException if the communication with the inputFs fail or -454 * if the file is not found. -455 */ -456 private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo) -457 throws IOException { -458 try { -459 Configuration conf = context.getConfiguration(); -460 FileLink link = null; -461 switch (fileInfo.getType()) { -462 case HFILE: -463 Path inputPath = new Path(fileInfo.getHfile()); -464 link = getFileLink(inputPath, conf); -465 break; -466 case WAL: -467 String serverName = fileInfo.getWalServer(); -468 String logName = fileInfo.getWalName(); -469 link = new WALLink(inputRoot, serverName, logName); -470 break; -471 default: -472 throw new IOException("Invalid File Type: " + fileInfo.getType().toString()); -473 } -474 return link.open(inputFs); -475 } catch (IOException e) { -476 context.getCounter(Counter.MISSING_FILES).increment(1); -477 LOG.error("Unable to open source file=" + fileInfo.toString(), e); -478 throw e; -479 } -480 } -481 -482 private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo) -483 throws IOException { -484 try { -485 Configuration conf = context.getConfiguration(); -486 FileLink link = null; -487 switch (fileInfo.getType()) { -488 case HFILE: -489 Path inputPath = new Path(fileInfo.getHfile()); -490 link = getFileLink(inputPath, conf); -491 break; -492 case WAL: -493 link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName()); -494 break; -495 default: -496 throw new IOException("Invalid File Type: " + fileInfo.getType().toString()); -497 } -498 return link.getFileStatus(inputFs); -499 } catch (FileNotFoundException e) { -500 context.getCounter(Counter.MISSING_FILES).increment(1); -501 LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e); -502 throw e; -503 } catch (IOException e) { -504 LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e); -505 throw e; -506 } -507 } -508 -509 private FileLink getFileLink(Path path, Configuration conf) throws IOException{ -510 String regionName = HFileLink.getReferencedRegionName(path.getName()); -511 TableName tableName = HFileLink.getReferencedTableName(path.getName()); -512 if(MobUtils.getMobRegionInfo(tableName).getEncodedName().equals(regionName)) { -513 return HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(conf), -514 HFileArchiveUtil.getArchivePath(conf), path); -515 } -516 return HFileLink.buildFromHFileLinkPattern(inputRoot, inputArchive, path); -517 } -518 -519 private FileChecksum getFileChecksum(final FileSystem fs, final Path path) { -520 try { -521 return fs.getFileChecksum(path); -522 } catch (IOException e) { -523 LOG.warn("Unable to get checksum for file=" + path, e); -524 return null; -525 } -526 } -527 -528 /** -529 * Check if the two files are equal by looking at the file length, -530 * and at the checksum (if user has specified the verifyChecksum flag). -531 */ -532 private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) { -533 // Not matching length -534 if (inputStat.getLen() != outputStat.getLen()) return false; -535 -536 // Mark files as equals, since user asked for no checksum verification -537 if (!verifyChecksum) return true; -538 -539 // If checksums are not available, files are not the same. -540 FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath()); -541 if (inChecksum == null) return false; -542 -543 FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath()); -544 if (outChecksum == null) return false; -545 -546 return inChecksum.equals(outChecksum); -547 } -548 } -549 -550 // ========================================================================== -551 // Input Format -552 // ========================================================================== -553 -554 /** -555 * Extract the list of files (HFiles/WALs) to copy using Map-Reduce. -556 * @return list of files referenced by the snapshot (pair of path and size) -557 */ -558 private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf, -559 final FileSystem fs, final Path snapshotDir) throws IOException { -560 SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); -561 -562 final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>(); -563 final TableName table = TableName.valueOf(snapshotDesc.getTable()); -564 -565 // Get snapshot files -566 LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list"); -567 SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc, -568 new SnapshotReferenceUtil.SnapshotVisitor() { -569 @Override -570 public void storeFile(final RegionInfo regionInfo, final String family, -571 final SnapshotRegionManifest.StoreFile storeFile) throws IOException { -572 // for storeFile.hasReference() case, copied as part of the manifest -573 if (!storeFile.hasReference()) { -574 String region = regionInfo.getEncodedName(); -575 String hfile = storeFile.getName(); -576 Path path = HFileLink.createPath(table, region, family, hfile); -577 -578 SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder() -579 .setType(SnapshotFileInfo.Type.HFILE) -580 .setHfile(path.toString()) -581 .build(); -582 -583 long size; -584 if (storeFile.hasFileSize()) { -585 size = storeFile.getFileSize(); -586 } else { -587 size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen(); -588 } -589 files.add(new Pair<>(fileInfo, size)); -590 } -591 } -592 }); -593 -594 return files; -595 } -596 -597 /** -598 * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible. -599 * The groups created will have similar amounts of bytes. -600 * <p> -601 * The algorithm used is pretty straightforward; the file list is sorted by size, -602 * and then each group fetch the bigger file available, iterating through groups -603 * alternating the direction. -604 */ -605 static List<List<Pair<SnapshotFileInfo, Long>>> getBalancedSplits( -606 final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) { -607 // Sort files by size, from small to big -608 Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() { -609 public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) { -610 long r = a.getSecond() - b.getSecond(); -611 return (r < 0) ? -1 : ((r > 0) ? 1 : 0); -612 } -613 }); -614 -615 // create balanced groups -616 List<List<Pair<SnapshotFileInfo, Long>>> fileGroups = new LinkedList<>(); -617 long[] sizeGroups = new long[ngroups]; -618 int hi = files.size() - 1; -619 int lo = 0; -620 -621 List<Pair<SnapshotFileInfo, Long>> group; -622 int dir = 1; -623 int g = 0; -624 -625 while (hi >= lo) { -626 if (g == fileGroups.size()) { -627 group = new LinkedList<>(); -628 fileGroups.add(group); -629 } else { -630 group = fileGroups.get(g); -631 } -632 -633 Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--); -634 -635 // add the hi one -636 sizeGroups[g] += fileInfo.getSecond(); -637 group.add(fileInfo); -638 -639 // change direction when at the end or the beginning -640 g += dir; -641 if (g == ngroups) { -642 dir = -1; -643 g = ngroups - 1; -644 } else if (g < 0) { -645 dir = 1; -646 g = 0; -647 } -648 } -649 -650 if (LOG.isDebugEnabled()) { -651 for (int i = 0; i < sizeGroups.length; ++i) { -652 LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i])); -653 } -654 } -655 -656 return fileGroups; -657 } -658 -659 private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> { -660 @Override -661 public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split, -662 TaskAttemptContext tac) throws IOException, InterruptedException { -663 return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit)split).getSplitKeys()); -664 } -665 -666 @Override -667 public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { -668 Configuration conf = context.getConfiguration(); -669 Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR)); -670 FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf); -671 -672 List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir); -673 int mappers = conf.getInt(CONF_NUM_SPLITS, 0); -674 if (mappers == 0 && snapshotFiles.size() > 0) { -675 mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10)); -676 mappers = Math.min(mappers, snapshotFiles.size()); -677 conf.setInt(CONF_NUM_SPLITS, mappers); -678 conf.setInt(MR_NUM_MAPS, mappers); -679 } -680 -681 List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers); -682 List<InputSplit> splits = new ArrayList(groups.size()); -683 for (List<Pair<SnapshotFileInfo, Long>> files: groups) { -684 splits.add(new ExportSnapshotInputSplit(files)); -685 } -686 return splits; -687 } -688 -689 private static class ExportSnapshotInputSplit extends InputSplit implements Writable { -690 private List<Pair<BytesWritable, Long>> files; -691 private long length; -692 -693 public ExportSnapshotInputSplit() { -694 this.files = null; -695 } -696 -697 public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) { -698 this.files = new ArrayList(snapshotFiles.size()); -699 for (Pair<SnapshotFileInfo, Long> fileInfo: snapshotFiles) { -700 this.files.add(new Pair<>( -701 new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond())); -702 this.length += fileInfo.getSecond(); -703 } -704 } -705 -706 private List<Pair<BytesWritable, Long>> getSplitKeys() { -707 return files; -708 } -709 -710 @Override -711 public long getLength() throws IOException, InterruptedException { -712 return length; -713 } -714 -715 @Override -716 public String[] getLocations() throws IOException, InterruptedException { -717 return new String[] {}; -718 } -719 -720 @Override -721 public void readFields(DataInput in) throws IOException { -722 int count = in.readInt(); -723 files = new ArrayList<>(count); -724 length = 0; -725 for (int i = 0; i < count; ++i) { -726 BytesWritable fileInfo = new BytesWritable(); -727 fileInfo.readFields(in); -728 long size = in.readLong(); -729 files.add(new Pair<>(fileInfo, size)); -730 length += size; -731 } -732 } -733 -734 @Override -735 public void write(DataOutput out) throws IOException { -736 out.writeInt(files.size()); -737 for (final Pair<BytesWritable, Long> fileInfo: files) { -738 fileInfo.getFirst().write(out); -739 out.writeLong(fileInfo.getSecond()); -740 } -741 } -742 } -743 -744 private static class ExportSnapshotRecordReader -745 extends RecordReader<BytesWritable, NullWritable> { -746 private final List<Pair<BytesWritable, Long>> files; -747 private long totalSize = 0; -748 private long procSize = 0; -749 private int index = -1; -750 -751 ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) { -752 this.files = files; -753 for (Pair<BytesWritable, Long> fileInfo: files) { -754 totalSize += fileInfo.getSecond(); -755 } -756 } -757 -758 @Override -759 public void close() { } -760 -761 @Override -762 public BytesWritable getCurrentKey() { return files.get(index).getFirst(); } -763 -764 @Override -765 public NullWritable getCurrentValue() { return NullWritable.get(); } -766 -767 @Override -768 public float getProgress() { return (float)procSize / totalSize; } -769 -770 @Override -771 public void initialize(InputSplit split, TaskAttemptContext tac) { } -772 -773 @Override -774 public boolean nextKeyValue() { -775 if (index >= 0) { -776 procSize += files.get(index).getSecond(); -777 } -778 return(++index < files.size()); -779 } -780 } -781 } -782 -783 // ========================================================================== -784 // Tool -785 // ========================================================================== -786 -787 /** -788 * Run Map-Reduce Job to perform the files copy. -789 */ -790 private void runCopyJob(final Path inputRoot, final Path outputRoot, -791 final String snapshotName, final Path snapshotDir, final boolean verifyChecksum, -792 final String filesUser, final String filesGroup, final int filesMode, -793 final int mappers, final int bandwidthMB) -794 throws IOException, InterruptedException, ClassNotFoundException { -795 Configuration conf = getConf(); -796 if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup); -797 if (filesUser != null) conf.set(CONF_FILES_USER, filesUser); -798 if (mappers > 0) { -799 conf.setInt(CONF_NUM_SPLITS, mappers); -800 conf.setInt(MR_NUM_MAPS, mappers); -801 } -802 conf.setInt(CONF_FILES_MODE, filesMode); -803 conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum); -804 conf.set(CONF_OUTPUT_ROOT, outputRoot.toString()); -805 conf.set(CONF_INPUT_ROOT, inputRoot.toString()); -806 conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB); -807 conf.set(CONF_SNAPSHOT_NAME, snapshotName); -808 conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString()); -809 -810 Job job = new Job(conf); -811 job.setJobName("ExportSnapshot-" + snapshotName); -812 job.setJarByClass(ExportSnapshot.class); -813 TableMapReduceUtil.addDependencyJars(job); -814 job.setMapperClass(ExportMapper.class); -815 job.setInputFormatClass(ExportSnapshotInputFormat.class); -816 job.setOutputFormatClass(NullOutputFormat.class); -817 job.setMapSpeculativeExecution(false); -818 job.setNumReduceTasks(0); -819 -820 // Acquire the delegation Tokens -821 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX); -822 TokenCache.obtainTokensForNamenodes(job.getCredentials(), -823 new Path[] { inputRoot }, srcConf); -824 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX); -825 TokenCache.obtainTokensForNamenodes(job.getCredentials(), -826 new Path[] { outputRoot }, destConf); -827 -828 // Run the MR Job -829 if (!job.waitForCompletion(true)) { -830 throw new ExportSnapshotException(job.getStatus().getFailureInfo()); -831 } -832 } -833 -834 private void verifySnapshot(final Configuration baseConf, -835 final FileSystem fs, final Path rootDir, final Path snapshotDir) throws IOException { -836 // Update the conf with the current root dir, since may be a different cluster -837 Configuration conf = new Configuration(baseConf); -838 FSUtils.setRootDir(conf, rootDir); -839 FSUtils.setFsDefault(conf, FSUtils.getRootDir(conf)); -840 SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); -841 SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc); -842 } -843 -844 /** -845 * Set path ownership. -846 */ -847 private void setOwner(final FileSystem fs, final Path path, final String user, -848 final String group, final boolean recursive) throws IOException { -849 if (user != null || group != null) { -850 if (recursive && fs.isDirectory(path)) { -851 for (FileStatus child : fs.listStatus(path)) { -852 setOwner(fs, child.getPath(), user, group, recursive); -853 } -854 } -855 fs.setOwner(path, user, group); -856 } -857 } -858 -859 /** -860 * Set path permission. -861 */ -862 private void setPermission(final FileSystem fs, final Path path, final short filesMode, -863 final boolean recursive) throws IOException { -864 if (filesMode > 0) { -865 FsPermission perm = new FsPermission(filesMode); -866 if (recursive && fs.isDirectory(path)) { -867 for (FileStatus child : fs.listStatus(path)) { -868 setPermission(fs, child.getPath(), filesMode, recursive); -869 } -870 } -871 fs.setPermission(path, perm); -872 } -873 } -874 -875 private boolean verifyTarget = true; -876 private boolean verifyChecksum = true; -877 private String snapshotName = null; -878 private String targetName = null; -879 private boolean overwrite = false; -880 private String filesGroup = null; -881 private String filesUser = null; -882 private Path outputRoot = null; -883 private Path inputRoot = null; -884 private int bandwidthMB = Integer.MAX_VALUE; -885 private int filesMode = 0; -886 private int mappers = 0; -887 -888 @Override -889 protected void processOptions(CommandLine cmd) { -890 snapshotName = cmd.getOptionValue(Options.SNAPSHOT.getLongOpt(), snapshotName); -891 targetName = cmd.getOptionValue(Options.TARGET_NAME.getLongOpt(), targetName); -892 if (cmd.hasOption(Options.COPY_TO.getLongOpt())) { -893 outputRoot = new Path(cmd.getOptionValue(Options.COPY_TO.getLongOpt())); -894 } -895 if (cmd.hasOption(Options.COPY_FROM.getLongOpt())) { -896 inputRoot = new Path(cmd.getOptionValue(Options.COPY_FROM.getLongOpt())); -897 } -898 mappers = getOptionAsInt(cmd, Options.MAPPERS.getLongOpt(), mappers); -899 filesUser = cmd.getOptionValue(Options.CHUSER.getLongOpt(), filesUser); -900 filesGroup = cmd.getOptionValue(Options.CHGROUP.getLongOpt(), filesGroup); -901 filesMode = getOptionAsInt(cmd, Options.CHMOD.getLongOpt(), filesMode); -902 bandwidthMB = getOptionAsInt(cmd, Options.BANDWIDTH.getLongOpt(), bandwidthMB); -903 overwrite = cmd.hasOption(Options.OVERWRITE.getLongOpt()); -904 // And verifyChecksum and verifyTarget with values read from old args in processOldArgs(...). -905 verifyChecksum = !cmd.hasOption(Options.NO_CHECKSUM_VERIFY.getLongOpt()); -906 verifyTarget = !cmd.hasOption(Options.NO_TARGET_VERIFY.getLongOpt()); -907 } -908 -909 /** -910 * Execute the export snapshot by copying the snapshot metadata, hfiles and wals. -911 * @return 0 on success, and != 0 upon failure. -912 */ -913 @Override -914 public int doWork() throws IOException { -915 Configuration conf = getConf(); -916 -917 // Check user options -918 if (snapshotName == null) { -919 System.err.println("Snapshot name not provided."); -920 LOG.error("Use -h or --help for usage instructions."); -921 return 0; -922 } -923 -924 if (outputRoot == null) { -925 System.err.println("Destination file-system (--" + Options.COPY_TO.getLongOpt() -926 + ") not provided."); -927 LOG.error("Use -h or --help for usage instructions."); -928 return 0; -929 } -930 -931 if (targetName == null) { -932 targetName = snapshotName; -933 } -934 if (inputRoot == null) { -935 inputRoot = FSUtils.getRootDir(conf); -936 } else { -937 FSUtils.setRootDir(conf, inputRoot); -938 } -939 -940 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX); -941 srcConf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true); -942 FileSystem inputFs = FileSystem.get(inputRoot.toUri(), srcConf); -943 LOG.debug("inputFs=" + inputFs.getUri().toString() + " inputRoot=" + inputRoot); -944 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX); -945 destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true); -946 FileSystem outputFs = FileSystem.get(outputRoot.toUri(), destConf); -947 LOG.debug("outputFs=" + outputFs.getUri().toString() + " outputRoot=" + outputRoot.toString()); -948 -949 boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false); +111 private static final String CONF_MR_JOB_NAME = "mapreduce.job.name"; +112 protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp"; +113 +114 static class Testing { +115 static final String CONF_TEST_FAILURE = "test.snapshot.export.failure"; +116 static final String CONF_TEST_FAILURE_COUNT = "test.snapshot.export.failure.count"; +117 int failuresCountToInject = 0; +118 int injectedFailureCount = 0; +119 } +120 +121 // Command line options and defaults. +122 static final class Options { +123 static final Option SNAPSHOT = new Option(null, "snapshot", true, "Snapshot to restore."); +124 static final Option TARGET_NAME = new Option(null, "target", true, +125 "Target name for the snapshot."); +126 static final Option COPY_TO = new Option(null, "copy-to", true, "Remote " +127 + "destination hdfs://"); +128 static final Option COPY_FROM = new Option(null, "copy-from", true, +129 "Input folder hdfs:// (default hbase.rootdir)"); +130 static final Option NO_CHECKSUM_VERIFY = new Option(null, "no-checksum-verify", false, +131 "Do not verify checksum, use name+length only."); +132 static final Option NO_TARGET_VERIFY = new Option(null, "no-target-verify", false, +133 "Do not verify the integrity of the exported snapshot."); +134 static final Option OVERWRITE = new Option(null, "overwrite", false, +135 "Rewrite the snapshot manifest if already exists."); +136 static final Option CHUSER = new Option(null, "chuser", true,