Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0BFFE200C64 for ; Thu, 13 Apr 2017 16:58:47 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 0AA95160BA7; Thu, 13 Apr 2017 14:58:47 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BC114160BAA for ; Thu, 13 Apr 2017 16:58:44 +0200 (CEST) Received: (qmail 37728 invoked by uid 500); 13 Apr 2017 14:58:38 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 37354 invoked by uid 99); 13 Apr 2017 14:58:38 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 13 Apr 2017 14:58:38 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 31783E38E1; Thu, 13 Apr 2017 14:58:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Thu, 13 Apr 2017 14:58:43 -0000 Message-Id: In-Reply-To: <65d7df08721348b1af9d13c3b8625875@git.apache.org> References: <65d7df08721348b1af9d13c3b8625875@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [08/22] hbase-site git commit: Published site at da5fb27eabed4a4b4d251be973ee945fb52895bf. archived-at: Thu, 13 Apr 2017 14:58:47 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/7b1830cf/devapidocs/src-html/org/apache/hadoop/hbase/snapshot/ExportSnapshot.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/snapshot/ExportSnapshot.html b/devapidocs/src-html/org/apache/hadoop/hbase/snapshot/ExportSnapshot.html index 2a09edb..b6fb3b4 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/snapshot/ExportSnapshot.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/snapshot/ExportSnapshot.html @@ -37,131 +37,131 @@ 029import java.util.Comparator; 030import java.util.LinkedList; 031import java.util.List; -032import java.util.Random; -033 -034import org.apache.commons.cli.CommandLine; -035import org.apache.commons.cli.Option; -036import org.apache.commons.logging.Log; -037import org.apache.commons.logging.LogFactory; -038import org.apache.hadoop.hbase.classification.InterfaceAudience; -039import org.apache.hadoop.conf.Configuration; -040import org.apache.hadoop.fs.FSDataInputStream; -041import org.apache.hadoop.fs.FSDataOutputStream; -042import org.apache.hadoop.fs.FileChecksum; -043import org.apache.hadoop.fs.FileStatus; -044import org.apache.hadoop.fs.FileSystem; -045import org.apache.hadoop.fs.FileUtil; -046import org.apache.hadoop.fs.Path; -047import org.apache.hadoop.fs.permission.FsPermission; -048import org.apache.hadoop.hbase.TableName; -049import org.apache.hadoop.hbase.HBaseConfiguration; -050import org.apache.hadoop.hbase.HConstants; -051import org.apache.hadoop.hbase.HRegionInfo; -052import org.apache.hadoop.hbase.io.FileLink; -053import org.apache.hadoop.hbase.io.HFileLink; -054import org.apache.hadoop.hbase.io.WALLink; -055import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; -056import org.apache.hadoop.hbase.mob.MobUtils; -057import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.SnapshotDescription; -058import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo; -059import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest; -060import org.apache.hadoop.hbase.util.AbstractHBaseTool; -061import org.apache.hadoop.hbase.util.FSUtils; -062import org.apache.hadoop.hbase.util.HFileArchiveUtil; -063import org.apache.hadoop.hbase.util.Pair; -064import org.apache.hadoop.io.BytesWritable; -065import org.apache.hadoop.io.IOUtils; -066import org.apache.hadoop.io.NullWritable; -067import org.apache.hadoop.io.Writable; -068import org.apache.hadoop.mapreduce.Job; -069import org.apache.hadoop.mapreduce.JobContext; -070import org.apache.hadoop.mapreduce.Mapper; -071import org.apache.hadoop.mapreduce.InputFormat; -072import org.apache.hadoop.mapreduce.InputSplit; -073import org.apache.hadoop.mapreduce.RecordReader; -074import org.apache.hadoop.mapreduce.TaskAttemptContext; -075import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; -076import org.apache.hadoop.mapreduce.security.TokenCache; -077import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream; -078import org.apache.hadoop.util.StringUtils; -079import org.apache.hadoop.util.Tool; -080 -081/** -082 * Export the specified snapshot to a given FileSystem. -083 * -084 * The .snapshot/name folder is copied to the destination cluster -085 * and then all the hfiles/wals are copied using a Map-Reduce Job in the .archive/ location. -086 * When everything is done, the second cluster can restore the snapshot. -087 */ -088@InterfaceAudience.Public -089public class ExportSnapshot extends AbstractHBaseTool implements Tool { -090 public static final String NAME = "exportsnapshot"; -091 /** Configuration prefix for overrides for the source filesystem */ -092 public static final String CONF_SOURCE_PREFIX = NAME + ".from."; -093 /** Configuration prefix for overrides for the destination filesystem */ -094 public static final String CONF_DEST_PREFIX = NAME + ".to."; -095 -096 private static final Log LOG = LogFactory.getLog(ExportSnapshot.class); -097 -098 private static final String MR_NUM_MAPS = "mapreduce.job.maps"; -099 private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits"; -100 private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name"; -101 private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir"; -102 private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user"; -103 private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group"; -104 private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode"; -105 private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify"; -106 private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root"; -107 private static final String CONF_INPUT_ROOT = "snapshot.export.input.root"; -108 private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size"; -109 private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group"; -110 private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb"; -111 protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp"; -112 -113 static final String CONF_TEST_FAILURE = "test.snapshot.export.failure"; -114 static final String CONF_TEST_RETRY = "test.snapshot.export.failure.retry"; -115 -116 -117 // Command line options and defaults. -118 static final class Options { -119 static final Option SNAPSHOT = new Option(null, "snapshot", true, "Snapshot to restore."); -120 static final Option TARGET_NAME = new Option(null, "target", true, -121 "Target name for the snapshot."); -122 static final Option COPY_TO = new Option(null, "copy-to", true, "Remote " -123 + "destination hdfs://"); -124 static final Option COPY_FROM = new Option(null, "copy-from", true, -125 "Input folder hdfs:// (default hbase.rootdir)"); -126 static final Option NO_CHECKSUM_VERIFY = new Option(null, "no-checksum-verify", false, -127 "Do not verify checksum, use name+length only."); -128 static final Option NO_TARGET_VERIFY = new Option(null, "no-target-verify", false, -129 "Do not verify the integrity of the exported snapshot."); -130 static final Option OVERWRITE = new Option(null, "overwrite", false, -131 "Rewrite the snapshot manifest if already exists."); -132 static final Option CHUSER = new Option(null, "chuser", true, -133 "Change the owner of the files to the specified one."); -134 static final Option CHGROUP = new Option(null, "chgroup", true, -135 "Change the group of the files to the specified one."); -136 static final Option CHMOD = new Option(null, "chmod", true, -137 "Change the permission of the files to the specified one."); -138 static final Option MAPPERS = new Option(null, "mappers", true, -139 "Number of mappers to use during the copy (mapreduce.job.maps)."); -140 static final Option BANDWIDTH = new Option(null, "bandwidth", true, -141 "Limit bandwidth to this value in MB/second."); -142 } -143 -144 // Export Map-Reduce Counters, to keep track of the progress -145 public enum Counter { -146 MISSING_FILES, FILES_COPIED, FILES_SKIPPED, COPY_FAILED, -147 BYTES_EXPECTED, BYTES_SKIPPED, BYTES_COPIED -148 } -149 -150 private static class ExportMapper extends Mapper<BytesWritable, NullWritable, -151 NullWritable, NullWritable> { -152 final static int REPORT_SIZE = 1 * 1024 * 1024; -153 final static int BUFFER_SIZE = 64 * 1024; -154 -155 private boolean testFailures; -156 private Random random; +032 +033import org.apache.commons.cli.CommandLine; +034import org.apache.commons.cli.Option; +035import org.apache.commons.logging.Log; +036import org.apache.commons.logging.LogFactory; +037import org.apache.hadoop.hbase.classification.InterfaceAudience; +038import org.apache.hadoop.conf.Configuration; +039import org.apache.hadoop.fs.FSDataInputStream; +040import org.apache.hadoop.fs.FSDataOutputStream; +041import org.apache.hadoop.fs.FileChecksum; +042import org.apache.hadoop.fs.FileStatus; +043import org.apache.hadoop.fs.FileSystem; +044import org.apache.hadoop.fs.FileUtil; +045import org.apache.hadoop.fs.Path; +046import org.apache.hadoop.fs.permission.FsPermission; +047import org.apache.hadoop.hbase.TableName; +048import org.apache.hadoop.hbase.HBaseConfiguration; +049import org.apache.hadoop.hbase.HConstants; +050import org.apache.hadoop.hbase.HRegionInfo; +051import org.apache.hadoop.hbase.io.FileLink; +052import org.apache.hadoop.hbase.io.HFileLink; +053import org.apache.hadoop.hbase.io.WALLink; +054import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +055import org.apache.hadoop.hbase.mob.MobUtils; +056import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.SnapshotDescription; +057import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo; +058import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest; +059import org.apache.hadoop.hbase.util.AbstractHBaseTool; +060import org.apache.hadoop.hbase.util.FSUtils; +061import org.apache.hadoop.hbase.util.HFileArchiveUtil; +062import org.apache.hadoop.hbase.util.Pair; +063import org.apache.hadoop.io.BytesWritable; +064import org.apache.hadoop.io.IOUtils; +065import org.apache.hadoop.io.NullWritable; +066import org.apache.hadoop.io.Writable; +067import org.apache.hadoop.mapreduce.Job; +068import org.apache.hadoop.mapreduce.JobContext; +069import org.apache.hadoop.mapreduce.Mapper; +070import org.apache.hadoop.mapreduce.InputFormat; +071import org.apache.hadoop.mapreduce.InputSplit; +072import org.apache.hadoop.mapreduce.RecordReader; +073import org.apache.hadoop.mapreduce.TaskAttemptContext; +074import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +075import org.apache.hadoop.mapreduce.security.TokenCache; +076import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream; +077import org.apache.hadoop.util.StringUtils; +078import org.apache.hadoop.util.Tool; +079 +080/** +081 * Export the specified snapshot to a given FileSystem. +082 * +083 * The .snapshot/name folder is copied to the destination cluster +084 * and then all the hfiles/wals are copied using a Map-Reduce Job in the .archive/ location. +085 * When everything is done, the second cluster can restore the snapshot. +086 */ +087@InterfaceAudience.Public +088public class ExportSnapshot extends AbstractHBaseTool implements Tool { +089 public static final String NAME = "exportsnapshot"; +090 /** Configuration prefix for overrides for the source filesystem */ +091 public static final String CONF_SOURCE_PREFIX = NAME + ".from."; +092 /** Configuration prefix for overrides for the destination filesystem */ +093 public static final String CONF_DEST_PREFIX = NAME + ".to."; +094 +095 private static final Log LOG = LogFactory.getLog(ExportSnapshot.class); +096 +097 private static final String MR_NUM_MAPS = "mapreduce.job.maps"; +098 private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits"; +099 private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name"; +100 private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir"; +101 private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user"; +102 private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group"; +103 private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode"; +104 private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify"; +105 private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root"; +106 private static final String CONF_INPUT_ROOT = "snapshot.export.input.root"; +107 private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size"; +108 private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group"; +109 private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb"; +110 protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp"; +111 +112 static class Testing { +113 static final String CONF_TEST_FAILURE = "test.snapshot.export.failure"; +114 static final String CONF_TEST_FAILURE_COUNT = "test.snapshot.export.failure.count"; +115 int failuresCountToInject = 0; +116 int injectedFailureCount = 0; +117 } +118 +119 // Command line options and defaults. +120 static final class Options { +121 static final Option SNAPSHOT = new Option(null, "snapshot", true, "Snapshot to restore."); +122 static final Option TARGET_NAME = new Option(null, "target", true, +123 "Target name for the snapshot."); +124 static final Option COPY_TO = new Option(null, "copy-to", true, "Remote " +125 + "destination hdfs://"); +126 static final Option COPY_FROM = new Option(null, "copy-from", true, +127 "Input folder hdfs:// (default hbase.rootdir)"); +128 static final Option NO_CHECKSUM_VERIFY = new Option(null, "no-checksum-verify", false, +129 "Do not verify checksum, use name+length only."); +130 static final Option NO_TARGET_VERIFY = new Option(null, "no-target-verify", false, +131 "Do not verify the integrity of the exported snapshot."); +132 static final Option OVERWRITE = new Option(null, "overwrite", false, +133 "Rewrite the snapshot manifest if already exists."); +134 static final Option CHUSER = new Option(null, "chuser", true, +135 "Change the owner of the files to the specified one."); +136 static final Option CHGROUP = new Option(null, "chgroup", true, +137 "Change the group of the files to the specified one."); +138 static final Option CHMOD = new Option(null, "chmod", true, +139 "Change the permission of the files to the specified one."); +140 static final Option MAPPERS = new Option(null, "mappers", true, +141 "Number of mappers to use during the copy (mapreduce.job.maps)."); +142 static final Option BANDWIDTH = new Option(null, "bandwidth", true, +143 "Limit bandwidth to this value in MB/second."); +144 } +145 +146 // Export Map-Reduce Counters, to keep track of the progress +147 public enum Counter { +148 MISSING_FILES, FILES_COPIED, FILES_SKIPPED, COPY_FAILED, +149 BYTES_EXPECTED, BYTES_SKIPPED, BYTES_COPIED +150 } +151 +152 private static class ExportMapper extends Mapper<BytesWritable, NullWritable, +153 NullWritable, NullWritable> { +154 private static final Log LOG = LogFactory.getLog(ExportMapper.class); +155 final static int REPORT_SIZE = 1 * 1024 * 1024; +156 final static int BUFFER_SIZE = 64 * 1024; 157 158 private boolean verifyChecksum; 159 private String filesGroup; @@ -177,950 +177,946 @@ 169 private Path inputArchive; 170 private Path inputRoot; 171 -172 @Override -173 public void setup(Context context) throws IOException { -174 Configuration conf = context.getConfiguration(); -175 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX); -176 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX); +172 private static Testing testing = new Testing(); +173 +174 @Override +175 public void setup(Context context) throws IOException { +176 Configuration conf = context.getConfiguration(); 177 -178 verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true); -179 -180 filesGroup = conf.get(CONF_FILES_GROUP); -181 filesUser = conf.get(CONF_FILES_USER); -182 filesMode = (short)conf.getInt(CONF_FILES_MODE, 0); -183 outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT)); -184 inputRoot = new Path(conf.get(CONF_INPUT_ROOT)); -185 -186 inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY); -187 outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY); +178 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX); +179 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX); +180 +181 verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true); +182 +183 filesGroup = conf.get(CONF_FILES_GROUP); +184 filesUser = conf.get(CONF_FILES_USER); +185 filesMode = (short)conf.getInt(CONF_FILES_MODE, 0); +186 outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT)); +187 inputRoot = new Path(conf.get(CONF_INPUT_ROOT)); 188 -189 testFailures = conf.getBoolean(CONF_TEST_FAILURE, false); -190 -191 try { -192 srcConf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true); -193 inputFs = FileSystem.get(inputRoot.toUri(), srcConf); -194 } catch (IOException e) { -195 throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e); -196 } -197 -198 try { -199 destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true); -200 outputFs = FileSystem.get(outputRoot.toUri(), destConf); -201 } catch (IOException e) { -202 throw new IOException("Could not get the output FileSystem with root="+ outputRoot, e); -203 } -204 -205 // Use the default block size of the outputFs if bigger -206 int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE); -207 bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize); -208 LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize)); -209 -210 for (Counter c : Counter.values()) { -211 context.getCounter(c).increment(0); -212 } -213 } -214 -215 @Override -216 protected void cleanup(Context context) { -217 IOUtils.closeStream(inputFs); -218 IOUtils.closeStream(outputFs); -219 } -220 -221 @Override -222 public void map(BytesWritable key, NullWritable value, Context context) -223 throws InterruptedException, IOException { -224 SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes()); -225 Path outputPath = getOutputPath(inputInfo); -226 -227 copyFile(context, inputInfo, outputPath); -228 } -229 -230 /** -231 * Returns the location where the inputPath will be copied. -232 */ -233 private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException { -234 Path path = null; -235 switch (inputInfo.getType()) { -236 case HFILE: -237 Path inputPath = new Path(inputInfo.getHfile()); -238 String family = inputPath.getParent().getName(); -239 TableName table =HFileLink.getReferencedTableName(inputPath.getName()); -240 String region = HFileLink.getReferencedRegionName(inputPath.getName()); -241 String hfile = HFileLink.getReferencedHFileName(inputPath.getName()); -242 path = new Path(FSUtils.getTableDir(new Path("./"), table), -243 new Path(region, new Path(family, hfile))); -244 break; -245 case WAL: -246 LOG.warn("snapshot does not keeps WALs: " + inputInfo); -247 break; -248 default: -249 throw new IOException("Invalid File Type: " + inputInfo.getType().toString()); -250 } -251 return new Path(outputArchive, path); -252 } -253 -254 /* -255 * Used by TestExportSnapshot to simulate a failure -256 */ -257 private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo) -258 throws IOException { -259 if (testFailures) { -260 if (context.getConfiguration().getBoolean(CONF_TEST_RETRY, false)) { -261 if (random == null) { -262 random = new Random(); -263 } -264 -265 // FLAKY-TEST-WARN: lower is better, we can get some runs without the -266 // retry, but at least we reduce the number of test failures due to -267 // this test exception from the same map task. -268 if (random.nextFloat() < 0.03) { -269 throw new IOException("TEST RETRY FAILURE: Unable to copy input=" + inputInfo -270 + " time=" + System.currentTimeMillis()); -271 } -272 } else { -273 context.getCounter(Counter.COPY_FAILED).increment(1); -274 throw new IOException("TEST FAILURE: Unable to copy input=" + inputInfo); -275 } -276 } -277 } -278 -279 private void copyFile(final Context context, final SnapshotFileInfo inputInfo, -280 final Path outputPath) throws IOException { -281 injectTestFailure(context, inputInfo); -282 -283 // Get the file information -284 FileStatus inputStat = getSourceFileStatus(context, inputInfo); -285 -286 // Verify if the output file exists and is the same that we want to copy -287 if (outputFs.exists(outputPath)) { -288 FileStatus outputStat = outputFs.getFileStatus(outputPath); -289 if (outputStat != null && sameFile(inputStat, outputStat)) { -290 LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file."); -291 context.getCounter(Counter.FILES_SKIPPED).increment(1); -292 context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen()); -293 return; -294 } -295 } -296 -297 InputStream in = openSourceFile(context, inputInfo); -298 int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100); -299 if (Integer.MAX_VALUE != bandwidthMB) { -300 in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024L); -301 } -302 -303 try { -304 context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen()); -305 -306 // Ensure that the output folder is there and copy the file -307 createOutputPath(outputPath.getParent()); -308 FSDataOutputStream out = outputFs.create(outputPath, true); -309 try { -310 copyData(context, inputStat.getPath(), in, outputPath, out, inputStat.getLen()); -311 } finally { -312 out.close(); +189 inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY); +190 outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY); +191 +192 try { +193 srcConf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true); +194 inputFs = FileSystem.get(inputRoot.toUri(), srcConf); +195 } catch (IOException e) { +196 throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e); +197 } +198 +199 try { +200 destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true); +201 outputFs = FileSystem.get(outputRoot.toUri(), destConf); +202 } catch (IOException e) { +203 throw new IOException("Could not get the output FileSystem with root="+ outputRoot, e); +204 } +205 +206 // Use the default block size of the outputFs if bigger +207 int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE); +208 bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize); +209 LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize)); +210 +211 for (Counter c : Counter.values()) { +212 context.getCounter(c).increment(0); +213 } +214 if (context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) { +215 testing.failuresCountToInject = conf.getInt(Testing.CONF_TEST_FAILURE_COUNT, 0); +216 // Get number of times we have already injected failure based on attempt number of this +217 // task. +218 testing.injectedFailureCount = context.getTaskAttemptID().getId(); +219 } +220 } +221 +222 @Override +223 protected void cleanup(Context context) { +224 IOUtils.closeStream(inputFs); +225 IOUtils.closeStream(outputFs); +226 } +227 +228 @Override +229 public void map(BytesWritable key, NullWritable value, Context context) +230 throws InterruptedException, IOException { +231 SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes()); +232 Path outputPath = getOutputPath(inputInfo); +233 +234 copyFile(context, inputInfo, outputPath); +235 } +236 +237 /** +238 * Returns the location where the inputPath will be copied. +239 */ +240 private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException { +241 Path path = null; +242 switch (inputInfo.getType()) { +243 case HFILE: +244 Path inputPath = new Path(inputInfo.getHfile()); +245 String family = inputPath.getParent().getName(); +246 TableName table =HFileLink.getReferencedTableName(inputPath.getName()); +247 String region = HFileLink.getReferencedRegionName(inputPath.getName()); +248 String hfile = HFileLink.getReferencedHFileName(inputPath.getName()); +249 path = new Path(FSUtils.getTableDir(new Path("./"), table), +250 new Path(region, new Path(family, hfile))); +251 break; +252 case WAL: +253 LOG.warn("snapshot does not keeps WALs: " + inputInfo); +254 break; +255 default: +256 throw new IOException("Invalid File Type: " + inputInfo.getType().toString()); +257 } +258 return new Path(outputArchive, path); +259 } +260 +261 /** +262 * Used by TestExportSnapshot to test for retries when failures happen. +263 * Failure is injected in {@link #copyFile(Context, SnapshotFileInfo, Path)}. +264 */ +265 private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo) +266 throws IOException { +267 if (!context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) return; +268 if (testing.injectedFailureCount >= testing.failuresCountToInject) return; +269 testing.injectedFailureCount++; +270 context.getCounter(Counter.COPY_FAILED).increment(1); +271 LOG.debug("Injecting failure. Count: " + testing.injectedFailureCount); +272 throw new IOException(String.format("TEST FAILURE (%d of max %d): Unable to copy input=%s", +273 testing.injectedFailureCount, testing.failuresCountToInject, inputInfo)); +274 } +275 +276 private void copyFile(final Context context, final SnapshotFileInfo inputInfo, +277 final Path outputPath) throws IOException { +278 // Get the file information +279 FileStatus inputStat = getSourceFileStatus(context, inputInfo); +280 +281 // Verify if the output file exists and is the same that we want to copy +282 if (outputFs.exists(outputPath)) { +283 FileStatus outputStat = outputFs.getFileStatus(outputPath); +284 if (outputStat != null && sameFile(inputStat, outputStat)) { +285 LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file."); +286 context.getCounter(Counter.FILES_SKIPPED).increment(1); +287 context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen()); +288 return; +289 } +290 } +291 +292 InputStream in = openSourceFile(context, inputInfo); +293 int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100); +294 if (Integer.MAX_VALUE != bandwidthMB) { +295 in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024L); +296 } +297 +298 try { +299 context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen()); +300 +301 // Ensure that the output folder is there and copy the file +302 createOutputPath(outputPath.getParent()); +303 FSDataOutputStream out = outputFs.create(outputPath, true); +304 try { +305 copyData(context, inputStat.getPath(), in, outputPath, out, inputStat.getLen()); +306 } finally { +307 out.close(); +308 } +309 +310 // Try to Preserve attributes +311 if (!preserveAttributes(outputPath, inputStat)) { +312 LOG.warn("You may have to run manually chown on: " + outputPath); 313 } -314 -315 // Try to Preserve attributes -316 if (!preserveAttributes(outputPath, inputStat)) { -317 LOG.warn("You may have to run manually chown on: " + outputPath); -318 } -319 } finally { -320 in.close(); -321 } -322 } -323 -324 /** -325 * Create the output folder and optionally set ownership. -326 */ -327 private void createOutputPath(final Path path) throws IOException { -328 if (filesUser == null && filesGroup == null) { -329 outputFs.mkdirs(path); -330 } else { -331 Path parent = path.getParent(); -332 if (!outputFs.exists(parent) && !parent.isRoot()) { -333 createOutputPath(parent); -334 } -335 outputFs.mkdirs(path); -336 if (filesUser != null || filesGroup != null) { -337 // override the owner when non-null user/group is specified -338 outputFs.setOwner(path, filesUser, filesGroup); -339 } -340 if (filesMode > 0) { -341 outputFs.setPermission(path, new FsPermission(filesMode)); -342 } -343 } -344 } -345 -346 /** -347 * Try to Preserve the files attribute selected by the user copying them from the source file -348 * This is only required when you are exporting as a different user than "hbase" or on a system -349 * that doesn't have the "hbase" user. -350 * -351 * This is not considered a blocking failure since the user can force a chmod with the user -352 * that knows is available on the system. -353 */ -354 private boolean preserveAttributes(final Path path, final FileStatus refStat) { -355 FileStatus stat; -356 try { -357 stat = outputFs.getFileStatus(path); -358 } catch (IOException e) { -359 LOG.warn("Unable to get the status for file=" + path); -360 return false; -361 } -362 -363 try { -364 if (filesMode > 0 && stat.getPermission().toShort() != filesMode) { -365 outputFs.setPermission(path, new FsPermission(filesMode)); -366 } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) { -367 outputFs.setPermission(path, refStat.getPermission()); -368 } -369 } catch (IOException e) { -370 LOG.warn("Unable to set the permission for file="+ stat.getPath() +": "+ e.getMessage()); -371 return false; -372 } -373 -374 boolean hasRefStat = (refStat != null); -375 String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner(); -376 String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup(); -377 if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) { -378 try { -379 if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) { -380 outputFs.setOwner(path, user, group); -381 } -382 } catch (IOException e) { -383 LOG.warn("Unable to set the owner/group for file="+ stat.getPath() +": "+ e.getMessage()); -384 LOG.warn("The user/group may not exist on the destination cluster: user=" + -385 user + " group=" + group); -386 return false; -387 } -388 } -389 -390 return true; +314 } finally { +315 in.close(); +316 injectTestFailure(context, inputInfo); +317 } +318 } +319 +320 /** +321 * Create the output folder and optionally set ownership. +322 */ +323 private void createOutputPath(final Path path) throws IOException { +324 if (filesUser == null && filesGroup == null) { +325 outputFs.mkdirs(path); +326 } else { +327 Path parent = path.getParent(); +328 if (!outputFs.exists(parent) && !parent.isRoot()) { +329 createOutputPath(parent); +330 } +331 outputFs.mkdirs(path); +332 if (filesUser != null || filesGroup != null) { +333 // override the owner when non-null user/group is specified +334 outputFs.setOwner(path, filesUser, filesGroup); +335 } +336 if (filesMode > 0) { +337 outputFs.setPermission(path, new FsPermission(filesMode)); +338 } +339 } +340 } +341 +342 /** +343 * Try to Preserve the files attribute selected by the user copying them from the source file +344 * This is only required when you are exporting as a different user than "hbase" or on a system +345 * that doesn't have the "hbase" user. +346 * +347 * This is not considered a blocking failure since the user can force a chmod with the user +348 * that knows is available on the system. +349 */ +350 private boolean preserveAttributes(final Path path, final FileStatus refStat) { +351 FileStatus stat; +352 try { +353 stat = outputFs.getFileStatus(path); +354 } catch (IOException e) { +355 LOG.warn("Unable to get the status for file=" + path); +356 return false; +357 } +358 +359 try { +360 if (filesMode > 0 && stat.getPermission().toShort() != filesMode) { +361 outputFs.setPermission(path, new FsPermission(filesMode)); +362 } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) { +363 outputFs.setPermission(path, refStat.getPermission()); +364 } +365 } catch (IOException e) { +366 LOG.warn("Unable to set the permission for file="+ stat.getPath() +": "+ e.getMessage()); +367 return false; +368 } +369 +370 boolean hasRefStat = (refStat != null); +371 String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner(); +372 String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup(); +373 if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) { +374 try { +375 if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) { +376 outputFs.setOwner(path, user, group); +377 } +378 } catch (IOException e) { +379 LOG.warn("Unable to set the owner/group for file="+ stat.getPath() +": "+ e.getMessage()); +380 LOG.warn("The user/group may not exist on the destination cluster: user=" + +381 user + " group=" + group); +382 return false; +383 } +384 } +385 +386 return true; +387 } +388 +389 private boolean stringIsNotEmpty(final String str) { +390 return str != null && str.length() > 0; 391 } 392 -393 private boolean stringIsNotEmpty(final String str) { -394 return str != null && str.length() > 0; -395 } -396 -397 private void copyData(final Context context, -398 final Path inputPath, final InputStream in, -399 final Path outputPath, final FSDataOutputStream out, -400 final long inputFileSize) -401 throws IOException { -402 final String statusMessage = "copied %s/" + StringUtils.humanReadableInt(inputFileSize) + -403 " (%.1f%%)"; -404 -405 try { -406 byte[] buffer = new byte[bufferSize]; -407 long totalBytesWritten = 0; -408 int reportBytes = 0; -409 int bytesRead; -410 -411 long stime = System.currentTimeMillis(); -412 while ((bytesRead = in.read(buffer)) > 0) { -413 out.write(buffer, 0, bytesRead); -414 totalBytesWritten += bytesRead; -415 reportBytes += bytesRead; -416 -417 if (reportBytes >= REPORT_SIZE) { -418 context.getCounter(Counter.BYTES_COPIED).increment(reportBytes); -419 context.setStatus(String.format(statusMessage, -420 StringUtils.humanReadableInt(totalBytesWritten), -421 (totalBytesWritten/(float)inputFileSize) * 100.0f) + -422 " from " + inputPath + " to " + outputPath); -423 reportBytes = 0; -424 } -425 } -426 long etime = System.currentTimeMillis(); -427 -428 context.getCounter(Counter.BYTES_COPIED).increment(reportBytes); -429 context.setStatus(String.format(statusMessage, -430 StringUtils.humanReadableInt(totalBytesWritten), -431 (totalBytesWritten/(float)inputFileSize) * 100.0f) + -432 " from " + inputPath + " to " + outputPath); -433 -434 // Verify that the written size match -435 if (totalBytesWritten != inputFileSize) { -436 String msg = "number of bytes copied not matching copied=" + totalBytesWritten + -437 " expected=" + inputFileSize + " for file=" + inputPath; -438 throw new IOException(msg); -439 } -440 -441 LOG.info("copy completed for input=" + inputPath + " output=" + outputPath); -442 LOG.info("size=" + totalBytesWritten + -443 " (" + StringUtils.humanReadableInt(totalBytesWritten) + ")" + -444 " time=" + StringUtils.formatTimeDiff(etime, stime) + -445 String.format(" %.3fM/sec", (totalBytesWritten / ((etime - stime)/1000.0))/1048576.0)); -446 context.getCounter(Counter.FILES_COPIED).increment(1); -447 } catch (IOException e) { -448 LOG.error("Error copying " + inputPath + " to " + outputPath, e); -449 context.getCounter(Counter.COPY_FAILED).increment(1); -450 throw e; -451 } -452 } -453 -454 /** -455 * Try to open the "source" file. -456 * Throws an IOException if the communication with the inputFs fail or -457 * if the file is not found. -458 */ -459 private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo) -460 throws IOException { -461 try { -462 Configuration conf = context.getConfiguration(); -463 FileLink link = null; -464 switch (fileInfo.getType()) { -465 case HFILE: -466 Path inputPath = new Path(fileInfo.getHfile()); -467 link = getFileLink(inputPath, conf); -468 break; -469 case WAL: -470 String serverName = fileInfo.getWalServer(); -471 String logName = fileInfo.getWalName(); -472 link = new WALLink(inputRoot, serverName, logName); -473 break; -474 default: -475 throw new IOException("Invalid File Type: " + fileInfo.getType().toString()); -476 } -477 return link.open(inputFs); -478 } catch (IOException e) { -479 context.getCounter(Counter.MISSING_FILES).increment(1); -480 LOG.error("Unable to open source file=" + fileInfo.toString(), e); -481 throw e; -482 } -483 } -484 -485 private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo) -486 throws IOException { -487 try { -488 Configuration conf = context.getConfiguration(); -489 FileLink link = null; -490 switch (fileInfo.getType()) { -491 case HFILE: -492 Path inputPath = new Path(fileInfo.getHfile()); -493 link = getFileLink(inputPath, conf); -494 break; -495 case WAL: -496 link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName()); -497 break; -498 default: -499 throw new IOException("Invalid File Type: " + fileInfo.getType().toString()); -500 } -501 return link.getFileStatus(inputFs); -502 } catch (FileNotFoundException e) { -503 context.getCounter(Counter.MISSING_FILES).increment(1); -504 LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e); -505 throw e; -506 } catch (IOException e) { -507 LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e); -508 throw e; -509 } -510 } -511 -512 private FileLink getFileLink(Path path, Configuration conf) throws IOException{ -513 String regionName = HFileLink.getReferencedRegionName(path.getName()); -514 TableName tableName = HFileLink.getReferencedTableName(path.getName()); -515 if(MobUtils.getMobRegionInfo(tableName).getEncodedName().equals(regionName)) { -516 return HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(conf), -517 HFileArchiveUtil.getArchivePath(conf), path); -518 } -519 return HFileLink.buildFromHFileLinkPattern(inputRoot, inputArchive, path); -520 } -521 -522 private FileChecksum getFileChecksum(final FileSystem fs, final Path path) { -523 try { -524 return fs.getFileChecksum(path); -525 } catch (IOException e) { -526 LOG.warn("Unable to get checksum for file=" + path, e); -527 return null; -528 } -529 } -530 -531 /** -532 * Check if the two files are equal by looking at the file length, -533 * and at the checksum (if user has specified the verifyChecksum flag). -534 */ -535 private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) { -536 // Not matching length -537 if (input