Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D15871100C for ; Thu, 12 Jun 2014 08:11:32 +0000 (UTC) Received: (qmail 61481 invoked by uid 500); 12 Jun 2014 08:11:32 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 61430 invoked by uid 500); 12 Jun 2014 08:11:32 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 61423 invoked by uid 99); 12 Jun 2014 08:11:32 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 12 Jun 2014 08:11:31 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id BA0CB485D5; Thu, 12 Jun 2014 08:11:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mbertozzi@apache.org To: commits@hbase.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: HBASE-11326 Use an InputFormat for ExportSnapshot Date: Thu, 12 Jun 2014 08:11:31 +0000 (UTC) Repository: hbase Updated Branches: refs/heads/master 95a7e7230 -> 8064bd4ff HBASE-11326 Use an InputFormat for ExportSnapshot Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8064bd4f Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8064bd4f Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8064bd4f Branch: refs/heads/master Commit: 8064bd4fff20f74d0e8c05044802837e6d5ba97f Parents: 95a7e72 Author: Matteo Bertozzi Authored: Thu Jun 12 09:06:00 2014 +0100 Committer: Matteo Bertozzi Committed: Thu Jun 12 09:07:36 2014 +0100 ---------------------------------------------------------------------- .../hadoop/hbase/snapshot/ExportSnapshot.java | 325 ++++++++++++------- .../hbase/snapshot/TestExportSnapshot.java | 21 +- 2 files changed, 214 insertions(+), 132 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/8064bd4f/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java index 5215c41..a28eb92 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.snapshot; import java.io.BufferedInputStream; import java.io.FileNotFoundException; +import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; import java.io.InputStream; import java.net.URI; @@ -62,8 +64,14 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.mapreduce.security.TokenCache; @@ -84,6 +92,10 @@ import org.apache.hadoop.util.ToolRunner; public final class ExportSnapshot extends Configured implements Tool { private static final Log LOG = LogFactory.getLog(ExportSnapshot.class); + private static final String MR_NUM_MAPS = "mapreduce.job.maps"; + private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits"; + private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name"; + private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir"; private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user"; private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group"; private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode"; @@ -456,19 +468,23 @@ public final class ExportSnapshot extends Configured implements Tool { } } + // ========================================================================== + // Input Format + // ========================================================================== + /** * Extract the list of files (HFiles/HLogs) to copy using Map-Reduce. * @return list of files referenced by the snapshot (pair of path and size) */ - private List> getSnapshotFiles(final FileSystem fs, - final Path snapshotDir) throws IOException { + private static List> getSnapshotFiles(final Configuration conf, + final FileSystem fs, final Path snapshotDir) throws IOException { SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); final List> files = new ArrayList>(); final TableName table = TableName.valueOf(snapshotDesc.getTable()); - final Configuration conf = getConf(); // Get snapshot files + LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list"); SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc, new SnapshotReferenceUtil.SnapshotVisitor() { @Override @@ -486,7 +502,12 @@ public final class ExportSnapshot extends Configured implements Tool { .setHfile(path.toString()) .build(); - long size = new HFileLink(conf, path).getFileStatus(fs).getLen(); + long size; + if (storeFile.hasFileSize()) { + size = storeFile.getFileSize(); + } else { + size = new HFileLink(conf, path).getFileStatus(fs).getLen(); + } files.add(new Pair(fileInfo, size)); } } @@ -516,7 +537,7 @@ public final class ExportSnapshot extends Configured implements Tool { * and then each group fetch the bigger file available, iterating through groups * alternating the direction. */ - static List> getBalancedSplits( + static List>> getBalancedSplits( final List> files, final int ngroups) { // Sort files by size, from small to big Collections.sort(files, new Comparator>() { @@ -527,18 +548,19 @@ public final class ExportSnapshot extends Configured implements Tool { }); // create balanced groups - List> fileGroups = new LinkedList>(); + List>> fileGroups = + new LinkedList>>(); long[] sizeGroups = new long[ngroups]; int hi = files.size() - 1; int lo = 0; - List group; + List> group; int dir = 1; int g = 0; while (hi >= lo) { if (g == fileGroups.size()) { - group = new LinkedList(); + group = new LinkedList>(); fileGroups.add(group); } else { group = fileGroups.get(g); @@ -548,7 +570,7 @@ public final class ExportSnapshot extends Configured implements Tool { // add the hi one sizeGroups[g] += fileInfo.getSecond(); - group.add(fileInfo.getFirst()); + group.add(fileInfo); // change direction when at the end or the beginning g += dir; @@ -570,102 +592,177 @@ public final class ExportSnapshot extends Configured implements Tool { return fileGroups; } - private static Path getInputFolderPath(Configuration conf) - throws IOException, InterruptedException { - Path stagingDir = JobUtil.getStagingDir(conf); - return new Path(stagingDir, INPUT_FOLDER_PREFIX + - String.valueOf(EnvironmentEdgeManager.currentTimeMillis())); - } + private static class ExportSnapshotInputFormat extends InputFormat { + @Override + public RecordReader createRecordReader(InputSplit split, + TaskAttemptContext tac) throws IOException, InterruptedException { + return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit)split).getSplitKeys()); + } - /** - * Create the input files, with the path to copy, for the MR job. - * Each input files contains n files, and each input file has a similar amount data to copy. - * The number of input files created are based on the number of mappers provided as argument - * and the number of the files to copy. - */ - private static Path[] createInputFiles(final Configuration conf, final Path inputFolderPath, - final List> snapshotFiles, int mappers) - throws IOException, InterruptedException { - FileSystem fs = inputFolderPath.getFileSystem(conf); - LOG.debug("Input folder location: " + inputFolderPath); - - List> splits = getBalancedSplits(snapshotFiles, mappers); - Path[] inputFiles = new Path[splits.size()]; - - BytesWritable key = new BytesWritable(); - for (int i = 0; i < inputFiles.length; i++) { - List files = splits.get(i); - inputFiles[i] = new Path(inputFolderPath, String.format("export-%d.seq", i)); - SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, inputFiles[i], - BytesWritable.class, NullWritable.class); - LOG.debug("Input split: " + i); - try { - for (SnapshotFileInfo file: files) { - byte[] pbFileInfo = file.toByteArray(); - key.set(pbFileInfo, 0, pbFileInfo.length); - writer.append(key, NullWritable.get()); + @Override + public List getSplits(JobContext context) throws IOException, InterruptedException { + Configuration conf = context.getConfiguration(); + String snapshotName = conf.get(CONF_SNAPSHOT_NAME); + Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR)); + FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf); + + List> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir); + int mappers = conf.getInt(CONF_NUM_SPLITS, 0); + if (mappers == 0 && snapshotFiles.size() > 0) { + mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10)); + mappers = Math.min(mappers, snapshotFiles.size()); + conf.setInt(CONF_NUM_SPLITS, mappers); + conf.setInt(MR_NUM_MAPS, mappers); + } + + List>> groups = getBalancedSplits(snapshotFiles, mappers); + List splits = new ArrayList(groups.size()); + for (List> files: groups) { + splits.add(new ExportSnapshotInputSplit(files)); + } + return splits; + } + + private static class ExportSnapshotInputSplit extends InputSplit implements Writable { + private List> files; + private long length; + + public ExportSnapshotInputSplit() { + this.files = null; + } + + public ExportSnapshotInputSplit(final List> snapshotFiles) { + this.files = new ArrayList(snapshotFiles.size()); + for (Pair fileInfo: snapshotFiles) { + this.files.add(new Pair( + new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond())); + this.length += fileInfo.getSecond(); + } + } + + private List> getSplitKeys() { + return files; + } + + @Override + public long getLength() throws IOException, InterruptedException { + return length; + } + + @Override + public String[] getLocations() throws IOException, InterruptedException { + return new String[] {}; + } + + @Override + public void readFields(DataInput in) throws IOException { + int count = in.readInt(); + files = new ArrayList>(count); + length = 0; + for (int i = 0; i < count; ++i) { + BytesWritable fileInfo = new BytesWritable(); + fileInfo.readFields(in); + long size = in.readLong(); + files.add(new Pair(fileInfo, size)); + length += size; + } + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(files.size()); + for (final Pair fileInfo: files) { + fileInfo.getFirst().write(out); + out.writeLong(fileInfo.getSecond()); } - } finally { - writer.close(); } } - return inputFiles; + private static class ExportSnapshotRecordReader + extends RecordReader { + private final List> files; + private long totalSize = 0; + private long procSize = 0; + private int index = -1; + + ExportSnapshotRecordReader(final List> files) { + this.files = files; + for (Pair fileInfo: files) { + totalSize += fileInfo.getSecond(); + } + } + + @Override + public void close() { } + + @Override + public BytesWritable getCurrentKey() { return files.get(index).getFirst(); } + + @Override + public NullWritable getCurrentValue() { return NullWritable.get(); } + + @Override + public float getProgress() { return (float)procSize / totalSize; } + + @Override + public void initialize(InputSplit split, TaskAttemptContext tac) { } + + @Override + public boolean nextKeyValue() { + if (index >= 0) { + procSize += files.get(index).getSecond(); + } + return(++index < files.size()); + } + } } + // ========================================================================== + // Tool + // ========================================================================== + /** * Run Map-Reduce Job to perform the files copy. */ private void runCopyJob(final Path inputRoot, final Path outputRoot, - final List> snapshotFiles, final boolean verifyChecksum, + final String snapshotName, final Path snapshotDir, final boolean verifyChecksum, final String filesUser, final String filesGroup, final int filesMode, final int mappers, final int bandwidthMB) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = getConf(); if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup); if (filesUser != null) conf.set(CONF_FILES_USER, filesUser); + if (mappers > 0) { + conf.setInt(CONF_NUM_SPLITS, mappers); + conf.setInt(MR_NUM_MAPS, mappers); + } conf.setInt(CONF_FILES_MODE, filesMode); conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum); conf.set(CONF_OUTPUT_ROOT, outputRoot.toString()); conf.set(CONF_INPUT_ROOT, inputRoot.toString()); - conf.setInt("mapreduce.job.maps", mappers); conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB); + conf.set(CONF_SNAPSHOT_NAME, snapshotName); + conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString()); Job job = new Job(conf); - job.setJobName("ExportSnapshot"); + job.setJobName("ExportSnapshot-" + snapshotName); job.setJarByClass(ExportSnapshot.class); TableMapReduceUtil.addDependencyJars(job); job.setMapperClass(ExportMapper.class); - job.setInputFormatClass(SequenceFileInputFormat.class); + job.setInputFormatClass(ExportSnapshotInputFormat.class); job.setOutputFormatClass(NullOutputFormat.class); job.setMapSpeculativeExecution(false); job.setNumReduceTasks(0); - // Create MR Input - Path inputFolderPath = getInputFolderPath(conf); - for (Path path: createInputFiles(conf, inputFolderPath, snapshotFiles, mappers)) { - LOG.debug("Add Input Path=" + path); - SequenceFileInputFormat.addInputPath(job, path); - } + // Acquire the delegation Tokens + TokenCache.obtainTokensForNamenodes(job.getCredentials(), + new Path[] { inputRoot, outputRoot }, conf); - try { - // Acquire the delegation Tokens - TokenCache.obtainTokensForNamenodes(job.getCredentials(), - new Path[] { inputRoot, outputRoot }, conf); - - // Run the MR Job - if (!job.waitForCompletion(true)) { - // TODO: Replace the fixed string with job.getStatus().getFailureInfo() - // when it will be available on all the supported versions. - throw new ExportSnapshotException("Copy Files Map-Reduce Job failed"); - } - } finally { - // Remove MR Input - try { - inputFolderPath.getFileSystem(conf).delete(inputFolderPath, true); - } catch (IOException e) { - LOG.warn("Unable to remove MR input folder: " + inputFolderPath, e); - } + // Run the MR Job + if (!job.waitForCompletion(true)) { + // TODO: Replace the fixed string with job.getStatus().getFailureInfo() + // when it will be available on all the supported versions. + throw new ExportSnapshotException("Copy Files Map-Reduce Job failed"); } } @@ -698,45 +795,39 @@ public final class ExportSnapshot extends Configured implements Tool { int mappers = 0; Configuration conf = getConf(); + Path inputRoot = FSUtils.getRootDir(conf); // Process command line args for (int i = 0; i < args.length; i++) { String cmd = args[i]; - try { - if (cmd.equals("-snapshot")) { - snapshotName = args[++i]; - } else if (cmd.equals("-target")) { - targetName = args[++i]; - } else if (cmd.equals("-copy-to")) { - outputRoot = new Path(args[++i]); - } else if (cmd.equals("-copy-from")) { - Path sourceDir = new Path(args[++i]); - URI defaultFs = sourceDir.getFileSystem(conf).getUri(); - FSUtils.setFsDefault(conf, new Path(defaultFs)); - FSUtils.setRootDir(conf, sourceDir); - } else if (cmd.equals("-no-checksum-verify")) { - verifyChecksum = false; - } else if (cmd.equals("-no-target-verify")) { - verifyTarget = false; - } else if (cmd.equals("-mappers")) { - mappers = Integer.parseInt(args[++i]); - } else if (cmd.equals("-chuser")) { - filesUser = args[++i]; - } else if (cmd.equals("-chgroup")) { - filesGroup = args[++i]; - } else if (cmd.equals("-bandwidth")) { - bandwidthMB = Integer.parseInt(args[++i]); - } else if (cmd.equals("-chmod")) { - filesMode = Integer.parseInt(args[++i], 8); - } else if (cmd.equals("-overwrite")) { - overwrite = true; - } else if (cmd.equals("-h") || cmd.equals("--help")) { - printUsageAndExit(); - } else { - System.err.println("UNEXPECTED: " + cmd); - printUsageAndExit(); - } - } catch (IOException e) { + if (cmd.equals("-snapshot")) { + snapshotName = args[++i]; + } else if (cmd.equals("-target")) { + targetName = args[++i]; + } else if (cmd.equals("-copy-to")) { + outputRoot = new Path(args[++i]); + } else if (cmd.equals("-copy-from")) { + inputRoot = new Path(args[++i]); + } else if (cmd.equals("-no-checksum-verify")) { + verifyChecksum = false; + } else if (cmd.equals("-no-target-verify")) { + verifyTarget = false; + } else if (cmd.equals("-mappers")) { + mappers = Integer.parseInt(args[++i]); + } else if (cmd.equals("-chuser")) { + filesUser = args[++i]; + } else if (cmd.equals("-chgroup")) { + filesGroup = args[++i]; + } else if (cmd.equals("-bandwidth")) { + bandwidthMB = Integer.parseInt(args[++i]); + } else if (cmd.equals("-chmod")) { + filesMode = Integer.parseInt(args[++i], 8); + } else if (cmd.equals("-overwrite")) { + overwrite = true; + } else if (cmd.equals("-h") || cmd.equals("--help")) { + printUsageAndExit(); + } else { + System.err.println("UNEXPECTED: " + cmd); printUsageAndExit(); } } @@ -756,7 +847,6 @@ public final class ExportSnapshot extends Configured implements Tool { targetName = snapshotName; } - Path inputRoot = FSUtils.getRootDir(conf); FileSystem inputFs = FileSystem.get(inputRoot.toUri(), conf); LOG.debug("inputFs=" + inputFs.getUri().toString() + " inputRoot=" + inputRoot); FileSystem outputFs = FileSystem.get(outputRoot.toUri(), conf); @@ -800,14 +890,6 @@ public final class ExportSnapshot extends Configured implements Tool { } } - // Step 0 - Extract snapshot files to copy - LOG.info("Loading Snapshot hfile list"); - final List> files = getSnapshotFiles(inputFs, snapshotDir); - if (mappers == 0 && files.size() > 0) { - mappers = 1 + (files.size() / conf.getInt(CONF_MAP_GROUP, 10)); - mappers = Math.min(mappers, files.size()); - } - // Step 1 - Copy fs1:/.snapshot/ to fs2:/.snapshot/.tmp/ // The snapshot references must be copied before the hfiles otherwise the cleaner // will remove them because they are unreferenced. @@ -833,13 +915,8 @@ public final class ExportSnapshot extends Configured implements Tool { // The snapshot references must be copied before the files otherwise the files gets removed // by the HFileArchiver, since they have no references. try { - if (files.size() == 0) { - LOG.warn("There are 0 store file to be copied. There may be no data in the table."); - } else { - runCopyJob(inputRoot, outputRoot, files, verifyChecksum, - filesUser, filesGroup, filesMode, mappers, bandwidthMB); - } - + runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum, + filesUser, filesGroup, filesMode, mappers, bandwidthMB); LOG.info("Finalize the Snapshot Export"); if (!skipTmp) { http://git-wip-us.apache.org/repos/asf/hbase/blob/8064bd4f/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshot.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshot.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshot.java index 544dcda..c4fc0a9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshot.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshot.java @@ -163,26 +163,31 @@ public class TestExportSnapshot { // group 2: 18, 13, 8, 3 (total size: 42) // group 3: 17, 12, 7, 4 (total size: 42) // group 4: 16, 11, 6, 5 (total size: 42) - List> splits = ExportSnapshot.getBalancedSplits(files, 5); + List>> splits = ExportSnapshot.getBalancedSplits(files, 5); assertEquals(5, splits.size()); String[] split0 = new String[] {"file-20", "file-11", "file-10", "file-1", "file-0"}; - verifyBalanceSplit(splits.get(0), split0); + verifyBalanceSplit(splits.get(0), split0, 42); String[] split1 = new String[] {"file-19", "file-12", "file-9", "file-2"}; - verifyBalanceSplit(splits.get(1), split1); + verifyBalanceSplit(splits.get(1), split1, 42); String[] split2 = new String[] {"file-18", "file-13", "file-8", "file-3"}; - verifyBalanceSplit(splits.get(2), split2); + verifyBalanceSplit(splits.get(2), split2, 42); String[] split3 = new String[] {"file-17", "file-14", "file-7", "file-4"}; - verifyBalanceSplit(splits.get(3), split3); + verifyBalanceSplit(splits.get(3), split3, 42); String[] split4 = new String[] {"file-16", "file-15", "file-6", "file-5"}; - verifyBalanceSplit(splits.get(4), split4); + verifyBalanceSplit(splits.get(4), split4, 42); } - private void verifyBalanceSplit(final List split, final String[] expected) { + private void verifyBalanceSplit(final List> split, + final String[] expected, final long expectedSize) { assertEquals(expected.length, split.size()); + long totalSize = 0; for (int i = 0; i < expected.length; ++i) { - assertEquals(expected[i], split.get(i).getHfile()); + Pair fileInfo = split.get(i); + assertEquals(expected[i], fileInfo.getFirst().getHfile()); + totalSize += fileInfo.getSecond(); } + assertEquals(expectedSize, totalSize); } /**