Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 41E4F200D55 for ; Fri, 24 Nov 2017 11:43:23 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 408BB160C11; Fri, 24 Nov 2017 10:43:23 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BF2D5160BEE for ; Fri, 24 Nov 2017 11:43:20 +0100 (CET) Received: (qmail 33352 invoked by uid 500); 24 Nov 2017 10:43:14 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 33009 invoked by uid 99); 24 Nov 2017 10:43:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 24 Nov 2017 10:43:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 14EDEF5FDC; Fri, 24 Nov 2017 10:43:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sunilg@apache.org To: common-commits@hadoop.apache.org Date: Fri, 24 Nov 2017 10:43:31 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [20/49] hadoop git commit: HADOOP-13786 Add S3A committer for zero-rename commits to S3 endpoints. Contributed by Steve Loughran and Ryan Blue. archived-at: Fri, 24 Nov 2017 10:43:23 -0000 http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java new file mode 100644 index 0000000..4d7f524 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java @@ -0,0 +1,1371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileSystemTestHelper; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.MapFile; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.mapreduce.task.JobContextImpl; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.concurrent.HadoopExecutors; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.*; +import static org.apache.hadoop.fs.s3a.S3AUtils.*; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; +import static org.apache.hadoop.test.LambdaTestUtils.*; + +/** + * Test the job/task commit actions of an S3A Committer, including trying to + * simulate some failure and retry conditions. + * Derived from + * {@code org.apache.hadoop.mapreduce.lib.output.TestFileOutputCommitter}. + * + * This is a complex test suite as it tries to explore the full lifecycle + * of committers, and is designed for subclassing. + */ +@SuppressWarnings({"unchecked", "ThrowableNotThrown", "unused"}) +public abstract class AbstractITCommitProtocol extends AbstractCommitITest { + private Path outDir; + + private static final Logger LOG = + LoggerFactory.getLogger(AbstractITCommitProtocol.class); + + private static final String SUB_DIR = "SUB_DIR"; + + protected static final String PART_00000 = "part-m-00000"; + + /** + * Counter to guarantee that even in parallel test runs, no job has the same + * ID. + */ + + private String jobId; + + // A random task attempt id for testing. + private String attempt0; + private TaskAttemptID taskAttempt0; + + private String attempt1; + private TaskAttemptID taskAttempt1; + + private static final Text KEY_1 = new Text("key1"); + private static final Text KEY_2 = new Text("key2"); + private static final Text VAL_1 = new Text("val1"); + private static final Text VAL_2 = new Text("val2"); + + /** A job to abort in test case teardown. */ + private List abortInTeardown = new ArrayList<>(1); + + private final StandardCommitterFactory + standardCommitterFactory = new StandardCommitterFactory(); + + private void cleanupDestDir() throws IOException { + rmdir(this.outDir, getConfiguration()); + } + + /** + * This must return the name of a suite which is unique to the non-abstract + * test. + * @return a string which must be unique and a valid path. + */ + protected abstract String suitename(); + + /** + * Get the log; can be overridden for test case log. + * @return a log. + */ + public Logger log() { + return LOG; + } + + /** + * Overridden method returns the suitename as well as the method name, + * so if more than one committer test is run in parallel, paths are + * isolated. + * @return a name for a method, unique across the suites and test cases. + */ + @Override + protected String getMethodName() { + return suitename() + "-" + super.getMethodName(); + } + + @Override + public void setup() throws Exception { + super.setup(); + jobId = randomJobId(); + attempt0 = "attempt_" + jobId + "_m_000000_0"; + taskAttempt0 = TaskAttemptID.forName(attempt0); + attempt1 = "attempt_" + jobId + "_m_000001_0"; + taskAttempt1 = TaskAttemptID.forName(attempt1); + + outDir = path(getMethodName()); + S3AFileSystem fileSystem = getFileSystem(); + bindFileSystem(fileSystem, outDir, fileSystem.getConf()); + abortMultipartUploadsUnderPath(outDir); + cleanupDestDir(); + } + + /** + * Create a random Job ID using the fork ID as part of the number. + * @return fork ID string in a format parseable by Jobs + * @throws Exception failure + */ + private String randomJobId() throws Exception { + String testUniqueForkId = System.getProperty(TEST_UNIQUE_FORK_ID, "0001"); + int l = testUniqueForkId.length(); + String trailingDigits = testUniqueForkId.substring(l - 4, l); + try { + int digitValue = Integer.valueOf(trailingDigits); + return String.format("20070712%04d_%04d", + (long)(Math.random() * 1000), + digitValue); + } catch (NumberFormatException e) { + throw new Exception("Failed to parse " + trailingDigits, e); + } + } + + @Override + public void teardown() throws Exception { + describe("teardown"); + abortInTeardown.forEach(this::abortJobQuietly); + if (outDir != null) { + try { + abortMultipartUploadsUnderPath(outDir); + cleanupDestDir(); + } catch (IOException e) { + log().info("Exception during cleanup", e); + } + } + S3AFileSystem fileSystem = getFileSystem(); + if (fileSystem != null) { + log().info("Statistics for {}:\n{}", fileSystem.getUri(), + fileSystem.getInstrumentation().dump(" ", " = ", "\n", true)); + } + + super.teardown(); + } + + /** + * Add the specified job to the current list of jobs to abort in teardown. + * @param jobData job data. + */ + protected void abortInTeardown(JobData jobData) { + abortInTeardown.add(jobData); + } + + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + disableFilesystemCaching(conf); + bindCommitter(conf); + return conf; + } + + /** + * Bind a path to the FS in the cache. + * @param fs filesystem + * @param path s3 path + * @param conf configuration + * @throws IOException any problem + */ + private void bindFileSystem(FileSystem fs, Path path, Configuration conf) + throws IOException { + FileSystemTestHelper.addFileSystemForTesting(path.toUri(), conf, fs); + } + + /*** + * Bind to the committer from the methods of + * {@link #getCommitterFactoryName()} and {@link #getCommitterName()}. + * @param conf configuration to set up + */ + protected void bindCommitter(Configuration conf) { + super.bindCommitter(conf, getCommitterFactoryName(), getCommitterName()); + } + + /** + * Create a committer for a task. + * @param context task context + * @return new committer + * @throws IOException failure + */ + protected AbstractS3ACommitter createCommitter( + TaskAttemptContext context) throws IOException { + return createCommitter(getOutDir(), context); + } + + /** + * Create a committer for a task and a given output path. + * @param outputPath path + * @param context task context + * @return new committer + * @throws IOException failure + */ + protected abstract AbstractS3ACommitter createCommitter( + Path outputPath, + TaskAttemptContext context) throws IOException; + + + protected String getCommitterFactoryName() { + return CommitConstants.S3A_COMMITTER_FACTORY; + } + + protected abstract String getCommitterName(); + + protected Path getOutDir() { + return outDir; + } + + protected String getJobId() { + return jobId; + } + + protected String getAttempt0() { + return attempt0; + } + + protected TaskAttemptID getTaskAttempt0() { + return taskAttempt0; + } + + protected String getAttempt1() { + return attempt1; + } + + protected TaskAttemptID getTaskAttempt1() { + return taskAttempt1; + } + + /** + * Functional interface for creating committers, designed to allow + * different factories to be used to create different failure modes. + */ + @FunctionalInterface + public interface CommitterFactory { + + /** + * Create a committer for a task. + * @param context task context + * @return new committer + * @throws IOException failure + */ + AbstractS3ACommitter createCommitter( + TaskAttemptContext context) throws IOException; + } + + /** + * The normal committer creation factory, uses the abstract methods + * in the class. + */ + public class StandardCommitterFactory implements CommitterFactory { + @Override + public AbstractS3ACommitter createCommitter(TaskAttemptContext context) + throws IOException { + return AbstractITCommitProtocol.this.createCommitter(context); + } + } + + /** + * Write some text out. + * @param context task + * @throws IOException IO failure + * @throws InterruptedException write interrupted + */ + protected void writeTextOutput(TaskAttemptContext context) + throws IOException, InterruptedException { + describe("write output"); + try (DurationInfo d = new DurationInfo(LOG, + "Writing Text output for task %s", context.getTaskAttemptID())) { + writeOutput(new LoggingTextOutputFormat().getRecordWriter(context), + context); + } + } + + /** + * Write the standard output. + * @param writer record writer + * @param context task context + * @throws IOException IO failure + * @throws InterruptedException write interrupted + */ + private void writeOutput(RecordWriter writer, + TaskAttemptContext context) throws IOException, InterruptedException { + NullWritable nullWritable = NullWritable.get(); + try(CloseWriter cw = new CloseWriter(writer, context)) { + writer.write(KEY_1, VAL_1); + writer.write(null, nullWritable); + writer.write(null, VAL_1); + writer.write(nullWritable, VAL_2); + writer.write(KEY_2, nullWritable); + writer.write(KEY_1, null); + writer.write(null, null); + writer.write(KEY_2, VAL_2); + writer.close(context); + } + } + + /** + * Write the output of a map. + * @param writer record writer + * @param context task context + * @throws IOException IO failure + * @throws InterruptedException write interrupted + */ + private void writeMapFileOutput(RecordWriter writer, + TaskAttemptContext context) throws IOException, InterruptedException { + describe("\nWrite map output"); + try (DurationInfo d = new DurationInfo(LOG, + "Writing Text output for task %s", context.getTaskAttemptID()); + CloseWriter cw = new CloseWriter(writer, context)) { + for (int i = 0; i < 10; ++i) { + Text val = ((i & 1) == 1) ? VAL_1 : VAL_2; + writer.write(new LongWritable(i), val); + } + writer.close(context); + } + } + + /** + * Details on a job for use in {@code startJob} and elsewhere. + */ + public static class JobData { + private final Job job; + private final JobContext jContext; + private final TaskAttemptContext tContext; + private final AbstractS3ACommitter committer; + private final Configuration conf; + + public JobData(Job job, + JobContext jContext, + TaskAttemptContext tContext, + AbstractS3ACommitter committer) { + this.job = job; + this.jContext = jContext; + this.tContext = tContext; + this.committer = committer; + conf = job.getConfiguration(); + } + } + + /** + * Create a new job. Sets the task attempt ID, + * and output dir; asks for a success marker. + * @return the new job + * @throws IOException failure + */ + public Job newJob() throws IOException { + return newJob(outDir, getConfiguration(), attempt0); + } + + /** + * Create a new job. Sets the task attempt ID, + * and output dir; asks for a success marker. + * @param dir dest dir + * @param configuration config to get the job from + * @param taskAttemptId task attempt + * @return the new job + * @throws IOException failure + */ + private Job newJob(Path dir, Configuration configuration, + String taskAttemptId) throws IOException { + Job job = Job.getInstance(configuration); + Configuration conf = job.getConfiguration(); + conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttemptId); + conf.setBoolean(CREATE_SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true); + FileOutputFormat.setOutputPath(job, dir); + return job; + } + + /** + * Start a job with a committer; optionally write the test data. + * Always register the job to be aborted (quietly) in teardown. + * This is, from an "OO-purity perspective" the wrong kind of method to + * do: it's setting things up, mixing functionality, registering for teardown. + * Its aim is simple though: a common body of code for starting work + * in test cases. + * @param writeText should the text be written? + * @return the job data 4-tuple + * @throws IOException IO problems + * @throws InterruptedException interruption during write + */ + protected JobData startJob(boolean writeText) + throws IOException, InterruptedException { + return startJob(standardCommitterFactory, writeText); + } + + /** + * Start a job with a committer; optionally write the test data. + * Always register the job to be aborted (quietly) in teardown. + * This is, from an "OO-purity perspective" the wrong kind of method to + * do: it's setting things up, mixing functionality, registering for teardown. + * Its aim is simple though: a common body of code for starting work + * in test cases. + * @param factory the committer factory to use + * @param writeText should the text be written? + * @return the job data 4-tuple + * @throws IOException IO problems + * @throws InterruptedException interruption during write + */ + protected JobData startJob(CommitterFactory factory, boolean writeText) + throws IOException, InterruptedException { + Job job = newJob(); + Configuration conf = job.getConfiguration(); + conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0); + conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1); + JobContext jContext = new JobContextImpl(conf, taskAttempt0.getJobID()); + TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, + taskAttempt0); + AbstractS3ACommitter committer = factory.createCommitter(tContext); + + // setup + JobData jobData = new JobData(job, jContext, tContext, committer); + setup(jobData); + abortInTeardown(jobData); + + if (writeText) { + // write output + writeTextOutput(tContext); + } + return jobData; + } + + /** + * Set up the job and task. + * @param jobData job data + * @throws IOException problems + */ + protected void setup(JobData jobData) throws IOException { + AbstractS3ACommitter committer = jobData.committer; + JobContext jContext = jobData.jContext; + TaskAttemptContext tContext = jobData.tContext; + describe("\nsetup job"); + try (DurationInfo d = new DurationInfo(LOG, + "setup job %s", jContext.getJobID())) { + committer.setupJob(jContext); + } + try (DurationInfo d = new DurationInfo(LOG, + "setup task %s", tContext.getTaskAttemptID())) { + committer.setupTask(tContext); + } + describe("setup complete\n"); + } + + /** + * Abort a job quietly. + * @param jobData job info + */ + protected void abortJobQuietly(JobData jobData) { + abortJobQuietly(jobData.committer, jobData.jContext, jobData.tContext); + } + + /** + * Abort a job quietly: first task, then job. + * @param committer committer + * @param jContext job context + * @param tContext task context + */ + protected void abortJobQuietly(AbstractS3ACommitter committer, + JobContext jContext, + TaskAttemptContext tContext) { + describe("\naborting task"); + try { + committer.abortTask(tContext); + } catch (IOException e) { + log().warn("Exception aborting task:", e); + } + describe("\naborting job"); + try { + committer.abortJob(jContext, JobStatus.State.KILLED); + } catch (IOException e) { + log().warn("Exception aborting job", e); + } + } + + /** + * Commit up the task and then the job. + * @param committer committer + * @param jContext job context + * @param tContext task context + * @throws IOException problems + */ + protected void commit(AbstractS3ACommitter committer, + JobContext jContext, + TaskAttemptContext tContext) throws IOException { + try (DurationInfo d = new DurationInfo(LOG, + "committing work", jContext.getJobID())) { + describe("\ncommitting task"); + committer.commitTask(tContext); + describe("\ncommitting job"); + committer.commitJob(jContext); + describe("commit complete\n"); + } + } + + /** + * Execute work as part of a test, after creating the job. + * After the execution, {@link #abortJobQuietly(JobData)} is + * called for abort/cleanup. + * @param name name of work (for logging) + * @param action action to execute + * @throws Exception failure + */ + protected void executeWork(String name, ActionToTest action) + throws Exception { + executeWork(name, startJob(false), action); + } + + /** + * Execute work as part of a test, against the created job. + * After the execution, {@link #abortJobQuietly(JobData)} is + * called for abort/cleanup. + * @param name name of work (for logging) + * @param jobData job info + * @param action action to execute + * @throws Exception failure + */ + public void executeWork(String name, + JobData jobData, + ActionToTest action) throws Exception { + try (DurationInfo d = new DurationInfo(LOG, "Executing %s", name)) { + action.exec(jobData.job, + jobData.jContext, + jobData.tContext, + jobData.committer); + } finally { + abortJobQuietly(jobData); + } + } + + /** + * Verify that recovery doesn't work for these committers. + */ + @Test + @SuppressWarnings("deprecation") + public void testRecoveryAndCleanup() throws Exception { + describe("Test (unsupported) task recovery."); + JobData jobData = startJob(true); + TaskAttemptContext tContext = jobData.tContext; + AbstractS3ACommitter committer = jobData.committer; + + assertNotNull("null workPath in committer " + committer, + committer.getWorkPath()); + assertNotNull("null outputPath in committer " + committer, + committer.getOutputPath()); + + // Commit the task. This will promote data and metadata to where + // job commits will pick it up on commit or abort. + committer.commitTask(tContext); + assertTaskAttemptPathDoesNotExist(committer, tContext); + + Configuration conf2 = jobData.job.getConfiguration(); + conf2.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0); + conf2.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 2); + JobContext jContext2 = new JobContextImpl(conf2, taskAttempt0.getJobID()); + TaskAttemptContext tContext2 = new TaskAttemptContextImpl(conf2, + taskAttempt0); + AbstractS3ACommitter committer2 = createCommitter(tContext2); + committer2.setupJob(tContext2); + + assertFalse("recoverySupported in " + committer2, + committer2.isRecoverySupported()); + intercept(PathCommitException.class, "recover", + () -> committer2.recoverTask(tContext2)); + + // at this point, task attempt 0 has failed to recover + // it should be abortable though. This will be a no-op as it already + // committed + describe("aborting task attempt 2; expect nothing to clean up"); + committer2.abortTask(tContext2); + describe("Aborting job 2; expect pending commits to be aborted"); + committer2.abortJob(jContext2, JobStatus.State.KILLED); + // now, state of system may still have pending data + assertNoMultipartUploadsPending(outDir); + } + + protected void assertTaskAttemptPathDoesNotExist( + AbstractS3ACommitter committer, TaskAttemptContext context) + throws IOException { + Path attemptPath = committer.getTaskAttemptPath(context); + ContractTestUtils.assertPathDoesNotExist( + attemptPath.getFileSystem(context.getConfiguration()), + "task attempt dir", + attemptPath); + } + + protected void assertJobAttemptPathDoesNotExist( + AbstractS3ACommitter committer, JobContext context) + throws IOException { + Path attemptPath = committer.getJobAttemptPath(context); + ContractTestUtils.assertPathDoesNotExist( + attemptPath.getFileSystem(context.getConfiguration()), + "job attempt dir", + attemptPath); + } + + /** + * Verify the output of the directory. + * That includes the {@code part-m-00000-*} + * file existence and contents, as well as optionally, the success marker. + * @param dir directory to scan. + * @param expectSuccessMarker check the success marker? + * @throws Exception failure. + */ + private void validateContent(Path dir, boolean expectSuccessMarker) + throws Exception { + if (expectSuccessMarker) { + verifySuccessMarker(dir); + } + Path expectedFile = getPart0000(dir); + log().debug("Validating content in {}", expectedFile); + StringBuffer expectedOutput = new StringBuffer(); + expectedOutput.append(KEY_1).append('\t').append(VAL_1).append("\n"); + expectedOutput.append(VAL_1).append("\n"); + expectedOutput.append(VAL_2).append("\n"); + expectedOutput.append(KEY_2).append("\n"); + expectedOutput.append(KEY_1).append("\n"); + expectedOutput.append(KEY_2).append('\t').append(VAL_2).append("\n"); + String output = readFile(expectedFile); + assertEquals("Content of " + expectedFile, + expectedOutput.toString(), output); + } + + /** + * Identify any path under the directory which begins with the + * {@code "part-m-00000"} sequence. There's some compensation for + * eventual consistency here. + * @param dir directory to scan + * @return the full path + * @throws FileNotFoundException the path is missing. + * @throws Exception failure. + */ + protected Path getPart0000(final Path dir) throws Exception { + final FileSystem fs = dir.getFileSystem(getConfiguration()); + return eventually(CONSISTENCY_WAIT, CONSISTENCY_PROBE_INTERVAL, + () -> getPart0000Immediately(fs, dir)); + } + + /** + * Identify any path under the directory which begins with the + * {@code "part-m-00000"} sequence. There's some compensation for + * eventual consistency here. + * @param fs FS to probe + * @param dir directory to scan + * @return the full path + * @throws FileNotFoundException the path is missing. + * @throws IOException failure. + */ + private Path getPart0000Immediately(FileSystem fs, Path dir) + throws IOException { + FileStatus[] statuses = fs.listStatus(dir, + path -> path.getName().startsWith(PART_00000)); + if (statuses.length != 1) { + // fail, with a listing of the parent dir + ContractTestUtils.assertPathExists(fs, "Output file", + new Path(dir, PART_00000)); + } + return statuses[0].getPath(); + } + + /** + * Look for the partFile subdir of the output dir. + * @param fs filesystem + * @param dir output dir + * @throws Exception failure. + */ + private void validateMapFileOutputContent( + FileSystem fs, Path dir) throws Exception { + // map output is a directory with index and data files + assertPathExists("Map output", dir); + Path expectedMapDir = getPart0000(dir); + assertPathExists("Map output", expectedMapDir); + assertIsDirectory(expectedMapDir); + FileStatus[] files = fs.listStatus(expectedMapDir); + assertTrue("No files found in " + expectedMapDir, files.length > 0); + assertPathExists("index file in " + expectedMapDir, + new Path(expectedMapDir, MapFile.INDEX_FILE_NAME)); + assertPathExists("data file in " + expectedMapDir, + new Path(expectedMapDir, MapFile.DATA_FILE_NAME)); + } + + /** + * Dump all MPUs in the filesystem. + * @throws IOException IO failure + */ + protected void dumpMultipartUploads() throws IOException { + countMultipartUploads(""); + } + + /** + * Full test of the expected lifecycle: start job, task, write, commit task, + * commit job. + * @throws Exception on a failure + */ + @Test + public void testCommitLifecycle() throws Exception { + describe("Full test of the expected lifecycle:\n" + + " start job, task, write, commit task, commit job.\n" + + "Verify:\n" + + "* no files are visible after task commit\n" + + "* the expected file is visible after job commit\n" + + "* no outstanding MPUs after job commit"); + JobData jobData = startJob(false); + JobContext jContext = jobData.jContext; + TaskAttemptContext tContext = jobData.tContext; + AbstractS3ACommitter committer = jobData.committer; + + // write output + describe("1. Writing output"); + writeTextOutput(tContext); + + dumpMultipartUploads(); + describe("2. Committing task"); + assertTrue("No files to commit were found by " + committer, + committer.needsTaskCommit(tContext)); + committer.commitTask(tContext); + + // this is only task commit; there MUST be no part- files in the dest dir + waitForConsistency(); + try { + applyLocatedFiles(getFileSystem().listFiles(outDir, false), + (status) -> + assertFalse("task committed file to dest :" + status, + status.getPath().toString().contains("part"))); + } catch (FileNotFoundException ignored) { + log().info("Outdir {} is not created by task commit phase ", + outDir); + } + + describe("3. Committing job"); + assertMultipartUploadsPending(outDir); + committer.commitJob(jContext); + + // validate output + describe("4. Validating content"); + validateContent(outDir, shouldExpectSuccessMarker()); + assertNoMultipartUploadsPending(outDir); + } + + @Test + public void testCommitterWithDuplicatedCommit() throws Exception { + describe("Call a task then job commit twice;" + + "expect the second task commit to fail."); + JobData jobData = startJob(true); + JobContext jContext = jobData.jContext; + TaskAttemptContext tContext = jobData.tContext; + AbstractS3ACommitter committer = jobData.committer; + + // do commit + commit(committer, jContext, tContext); + + // validate output + validateContent(outDir, shouldExpectSuccessMarker()); + + assertNoMultipartUploadsPending(outDir); + + // commit task to fail on retry + expectFNFEonTaskCommit(committer, tContext); + } + + protected boolean shouldExpectSuccessMarker() { + return true; + } + + /** + * Simulate a failure on the first job commit; expect the + * second to succeed. + */ + @Test + public void testCommitterWithFailure() throws Exception { + describe("Fail the first job commit then retry"); + JobData jobData = startJob(new FailingCommitterFactory(), true); + JobContext jContext = jobData.jContext; + TaskAttemptContext tContext = jobData.tContext; + AbstractS3ACommitter committer = jobData.committer; + + // do commit + committer.commitTask(tContext); + + // now fail job + expectSimulatedFailureOnJobCommit(jContext, committer); + + committer.commitJob(jContext); + + // but the data got there, due to the order of operations. + validateContent(outDir, shouldExpectSuccessMarker()); + expectJobCommitToFail(jContext, committer); + } + + /** + * Override point: the failure expected on the attempt to commit a failed + * job. + * @param jContext job context + * @param committer committer + * @throws Exception any unexpected failure. + */ + protected void expectJobCommitToFail(JobContext jContext, + AbstractS3ACommitter committer) throws Exception { + // next attempt will fail as there is no longer a directory to commit + expectJobCommitFailure(jContext, committer, + FileNotFoundException.class); + } + + /** + * Expect a job commit operation to fail with a specific exception. + * @param jContext job context + * @param committer committer + * @param clazz class of exception + * @return the caught exception + * @throws Exception any unexpected failure. + */ + protected static E expectJobCommitFailure( + JobContext jContext, + AbstractS3ACommitter committer, + Class clazz) + throws Exception { + + return intercept(clazz, + () -> { + committer.commitJob(jContext); + return committer.toString(); + }); + } + + protected static void expectFNFEonTaskCommit( + AbstractS3ACommitter committer, + TaskAttemptContext tContext) throws Exception { + intercept(FileNotFoundException.class, + () -> { + committer.commitTask(tContext); + return committer.toString(); + }); + } + + /** + * Simulate a failure on the first job commit; expect the + * second to succeed. + */ + @Test + public void testCommitterWithNoOutputs() throws Exception { + describe("Have a task and job with no outputs: expect success"); + JobData jobData = startJob(new FailingCommitterFactory(), false); + TaskAttemptContext tContext = jobData.tContext; + AbstractS3ACommitter committer = jobData.committer; + + // do commit + committer.commitTask(tContext); + assertTaskAttemptPathDoesNotExist(committer, tContext); + } + + protected static void expectSimulatedFailureOnJobCommit(JobContext jContext, + AbstractS3ACommitter committer) throws Exception { + ((CommitterFaultInjection) committer).setFaults( + CommitterFaultInjection.Faults.commitJob); + expectJobCommitFailure(jContext, committer, + CommitterFaultInjectionImpl.Failure.class); + } + + @Test + public void testMapFileOutputCommitter() throws Exception { + describe("Test that the committer generates map output into a directory\n" + + "starting with the prefix part-"); + JobData jobData = startJob(false); + JobContext jContext = jobData.jContext; + TaskAttemptContext tContext = jobData.tContext; + AbstractS3ACommitter committer = jobData.committer; + Configuration conf = jobData.conf; + + // write output + writeMapFileOutput(new MapFileOutputFormat().getRecordWriter(tContext), + tContext); + + // do commit + commit(committer, jContext, tContext); + S3AFileSystem fs = getFileSystem(); + waitForConsistency(); + lsR(fs, outDir, true); + String ls = ls(outDir); + describe("\nvalidating"); + + // validate output + verifySuccessMarker(outDir); + + describe("validate output of %s", outDir); + validateMapFileOutputContent(fs, outDir); + + // Ensure getReaders call works and also ignores + // hidden filenames (_ or . prefixes) + describe("listing"); + FileStatus[] filtered = fs.listStatus(outDir, HIDDEN_FILE_FILTER); + assertEquals("listed children under " + ls, + 1, filtered.length); + FileStatus fileStatus = filtered[0]; + assertTrue("Not the part file: " + fileStatus, + fileStatus.getPath().getName().startsWith(PART_00000)); + + describe("getReaders()"); + assertEquals("Number of MapFile.Reader entries with shared FS " + + outDir + " : " + ls, + 1, getReaders(fs, outDir, conf).length); + + describe("getReaders(new FS)"); + FileSystem fs2 = FileSystem.get(outDir.toUri(), conf); + assertEquals("Number of MapFile.Reader entries with shared FS2 " + + outDir + " : " + ls, + 1, getReaders(fs2, outDir, conf).length); + + describe("MapFileOutputFormat.getReaders"); + assertEquals("Number of MapFile.Reader entries with new FS in " + + outDir + " : " + ls, + 1, MapFileOutputFormat.getReaders(outDir, conf).length); + } + + /** Open the output generated by this format. */ + @SuppressWarnings("IOResourceOpenedButNotSafelyClosed") + private static MapFile.Reader[] getReaders(FileSystem fs, + Path dir, + Configuration conf) throws IOException { + Path[] names = FileUtil.stat2Paths(fs.listStatus(dir, HIDDEN_FILE_FILTER)); + + // sort names, so that hash partitioning works + Arrays.sort(names); + + MapFile.Reader[] parts = new MapFile.Reader[names.length]; + for (int i = 0; i < names.length; i++) { + parts[i] = new MapFile.Reader(names[i], conf); + } + return parts; + } + + /** + * A functional interface which an action to test must implement. + */ + @FunctionalInterface + public interface ActionToTest { + void exec(Job job, JobContext jContext, TaskAttemptContext tContext, + AbstractS3ACommitter committer) throws Exception; + } + + @Test + public void testAbortTaskNoWorkDone() throws Exception { + executeWork("abort task no work", + (job, jContext, tContext, committer) -> + committer.abortTask(tContext)); + } + + @Test + public void testAbortJobNoWorkDone() throws Exception { + executeWork("abort task no work", + (job, jContext, tContext, committer) -> + committer.abortJob(jContext, JobStatus.State.RUNNING)); + } + + @Test + public void testCommitJobButNotTask() throws Exception { + executeWork("commit a job while a task's work is pending, " + + "expect task writes to be cancelled.", + (job, jContext, tContext, committer) -> { + // step 1: write the text + writeTextOutput(tContext); + // step 2: commit the job + createCommitter(tContext).commitJob(tContext); + // verify that no output can be observed + assertPart0000DoesNotExist(outDir); + // that includes, no pending MPUs; commitJob is expected to + // cancel any. + assertNoMultipartUploadsPending(outDir); + } + ); + } + + @Test + public void testAbortTaskThenJob() throws Exception { + JobData jobData = startJob(true); + AbstractS3ACommitter committer = jobData.committer; + + // do abort + committer.abortTask(jobData.tContext); + + intercept(FileNotFoundException.class, "", + () -> getPart0000(committer.getWorkPath())); + + committer.abortJob(jobData.jContext, JobStatus.State.FAILED); + assertJobAbortCleanedUp(jobData); + } + + /** + * Extension point: assert that the job was all cleaned up after an abort. + * Base assertions + *
    + *
  • Output dir is absent or, if present, empty
  • + *
  • No pending MPUs to/under the output dir
  • + *
+ * @param jobData job data + * @throws Exception failure + */ + public void assertJobAbortCleanedUp(JobData jobData) throws Exception { + // special handling of magic directory; harmless in staging + S3AFileSystem fs = getFileSystem(); + try { + FileStatus[] children = listChildren(fs, outDir); + if (children.length != 0) { + lsR(fs, outDir, true); + } + assertArrayEquals("Output directory not empty " + ls(outDir), + new FileStatus[0], children); + } catch (FileNotFoundException e) { + // this is a valid failure mode; it means the dest dir doesn't exist yet. + } + assertNoMultipartUploadsPending(outDir); + } + + @Test + public void testFailAbort() throws Exception { + describe("Abort the task, then job (failed), abort the job again"); + JobData jobData = startJob(true); + JobContext jContext = jobData.jContext; + TaskAttemptContext tContext = jobData.tContext; + AbstractS3ACommitter committer = jobData.committer; + + // do abort + committer.abortTask(tContext); + + committer.getJobAttemptPath(jContext); + committer.getTaskAttemptPath(tContext); + assertPart0000DoesNotExist(outDir); + assertSuccessMarkerDoesNotExist(outDir); + describe("Aborting job into %s", outDir); + + committer.abortJob(jContext, JobStatus.State.FAILED); + + assertTaskAttemptPathDoesNotExist(committer, tContext); + assertJobAttemptPathDoesNotExist(committer, jContext); + + // try again; expect abort to be idempotent. + committer.abortJob(jContext, JobStatus.State.FAILED); + assertNoMultipartUploadsPending(outDir); + } + + public void assertPart0000DoesNotExist(Path dir) throws Exception { + intercept(FileNotFoundException.class, + () -> getPart0000(dir)); + assertPathDoesNotExist("expected output file", new Path(dir, PART_00000)); + } + + @Test + public void testAbortJobNotTask() throws Exception { + executeWork("abort task no work", + (job, jContext, tContext, committer) -> { + // write output + writeTextOutput(tContext); + committer.abortJob(jContext, JobStatus.State.RUNNING); + assertTaskAttemptPathDoesNotExist( + committer, tContext); + assertJobAttemptPathDoesNotExist( + committer, jContext); + assertNoMultipartUploadsPending(outDir); + }); + } + + /** + * This looks at what happens with concurrent commits. + * However, the failure condition it looks for (subdir under subdir) + * is the kind of failure you see on a rename-based commit. + * + * What it will not detect is the fact that both tasks will each commit + * to the destination directory. That is: whichever commits last wins. + * + * There's no way to stop this. Instead it is a requirement that the task + * commit operation is only executed when the committer is happy to + * commit only those tasks which it knows have succeeded, and abort those + * which have not. + * @throws Exception failure + */ + @Test + public void testConcurrentCommitTaskWithSubDir() throws Exception { + Job job = newJob(); + FileOutputFormat.setOutputPath(job, outDir); + final Configuration conf = job.getConfiguration(); + + final JobContext jContext = + new JobContextImpl(conf, taskAttempt0.getJobID()); + AbstractS3ACommitter amCommitter = createCommitter( + new TaskAttemptContextImpl(conf, taskAttempt0)); + amCommitter.setupJob(jContext); + + final TaskAttemptContext[] taCtx = new TaskAttemptContextImpl[2]; + taCtx[0] = new TaskAttemptContextImpl(conf, taskAttempt0); + taCtx[1] = new TaskAttemptContextImpl(conf, taskAttempt1); + + final TextOutputFormat[] tof = new LoggingTextOutputFormat[2]; + for (int i = 0; i < tof.length; i++) { + tof[i] = new LoggingTextOutputFormat() { + @Override + public Path getDefaultWorkFile( + TaskAttemptContext context, + String extension) throws IOException { + final AbstractS3ACommitter foc = (AbstractS3ACommitter) + getOutputCommitter(context); + return new Path(new Path(foc.getWorkPath(), SUB_DIR), + getUniqueFile(context, getOutputName(context), extension)); + } + }; + } + + final ExecutorService executor = HadoopExecutors.newFixedThreadPool(2); + try { + for (int i = 0; i < taCtx.length; i++) { + final int taskIdx = i; + executor.submit(() -> { + final OutputCommitter outputCommitter = + tof[taskIdx].getOutputCommitter(taCtx[taskIdx]); + outputCommitter.setupTask(taCtx[taskIdx]); + final RecordWriter rw = + tof[taskIdx].getRecordWriter(taCtx[taskIdx]); + writeOutput(rw, taCtx[taskIdx]); + describe("Committing Task %d", taskIdx); + outputCommitter.commitTask(taCtx[taskIdx]); + return null; + }); + } + } finally { + executor.shutdown(); + while (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + log().info("Awaiting thread termination!"); + } + } + + // if we commit here then all tasks will be committed, so there will + // be contention for that final directory: both parts will go in. + + describe("\nCommitting Job"); + amCommitter.commitJob(jContext); + assertPathExists("base output directory", outDir); + assertPart0000DoesNotExist(outDir); + Path outSubDir = new Path(outDir, SUB_DIR); + assertPathDoesNotExist("Must not end up with sub_dir/sub_dir", + new Path(outSubDir, SUB_DIR)); + + // validate output + // There's no success marker in the subdirectory + validateContent(outSubDir, false); + } + + /** + * Create a committer which fails; the class + * {@link CommitterFaultInjectionImpl} implements the logic. + * @param tContext task context + * @return committer instance + * @throws IOException failure to instantiate + */ + protected abstract AbstractS3ACommitter createFailingCommitter( + TaskAttemptContext tContext) throws IOException; + + /** + * Factory for failing committers. + */ + public class FailingCommitterFactory implements CommitterFactory { + @Override + public AbstractS3ACommitter createCommitter(TaskAttemptContext context) + throws IOException { + return createFailingCommitter(context); + } + } + + @Test + public void testOutputFormatIntegration() throws Throwable { + Configuration conf = getConfiguration(); + Job job = newJob(); + job.setOutputFormatClass(LoggingTextOutputFormat.class); + conf = job.getConfiguration(); + conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0); + conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1); + JobContext jContext = new JobContextImpl(conf, taskAttempt0.getJobID()); + TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, + taskAttempt0); + LoggingTextOutputFormat outputFormat = (LoggingTextOutputFormat) + ReflectionUtils.newInstance(tContext.getOutputFormatClass(), conf); + AbstractS3ACommitter committer = (AbstractS3ACommitter) + outputFormat.getOutputCommitter(tContext); + + // setup + JobData jobData = new JobData(job, jContext, tContext, committer); + setup(jobData); + abortInTeardown(jobData); + LoggingTextOutputFormat.LoggingLineRecordWriter recordWriter + = outputFormat.getRecordWriter(tContext); + IntWritable iw = new IntWritable(1); + recordWriter.write(iw, iw); + Path dest = recordWriter.getDest(); + validateTaskAttemptPathDuringWrite(dest); + recordWriter.close(tContext); + // at this point + validateTaskAttemptPathAfterWrite(dest); + assertTrue("Committer does not have data to commit " + committer, + committer.needsTaskCommit(tContext)); + committer.commitTask(tContext); + committer.commitJob(jContext); + // validate output + verifySuccessMarker(outDir); + } + + /** + * Create a committer through reflection then use it to abort + * a task. This mimics the action of an AM when a container fails and + * the AM wants to abort the task attempt. + */ + @Test + public void testAMWorkflow() throws Throwable { + describe("Create a committer with a null output path & use as an AM"); + JobData jobData = startJob(true); + JobContext jContext = jobData.jContext; + TaskAttemptContext tContext = jobData.tContext; + + TaskAttemptContext newAttempt = taskAttemptForJob( + MRBuilderUtils.newJobId(1, 1, 1), jContext); + Configuration conf = jContext.getConfiguration(); + + // bind + LoggingTextOutputFormat.bind(conf); + + OutputFormat outputFormat + = ReflectionUtils.newInstance(newAttempt + .getOutputFormatClass(), conf); + Path outputPath = FileOutputFormat.getOutputPath(newAttempt); + assertNotNull("null output path in new task attempt", outputPath); + + AbstractS3ACommitter committer2 = (AbstractS3ACommitter) + outputFormat.getOutputCommitter(newAttempt); + committer2.abortTask(tContext); + assertNoMultipartUploadsPending(getOutDir()); + } + + + @Test + public void testParallelJobsToAdjacentPaths() throws Throwable { + + describe("Run two jobs in parallel, assert they both complete"); + JobData jobData = startJob(true); + Job job1 = jobData.job; + AbstractS3ACommitter committer1 = jobData.committer; + JobContext jContext1 = jobData.jContext; + TaskAttemptContext tContext1 = jobData.tContext; + + // now build up a second job + String jobId2 = randomJobId(); + String attempt20 = "attempt_" + jobId2 + "_m_000000_0"; + TaskAttemptID taskAttempt20 = TaskAttemptID.forName(attempt20); + String attempt21 = "attempt_" + jobId2 + "_m_000001_0"; + TaskAttemptID taskAttempt21 = TaskAttemptID.forName(attempt21); + + Path job1Dest = outDir; + Path job2Dest = new Path(getOutDir().getParent(), + getMethodName() + "job2Dest"); + // little safety check + assertNotEquals(job1Dest, job2Dest); + + // create the second job + Job job2 = newJob(job2Dest, new JobConf(getConfiguration()), attempt20); + Configuration conf2 = job2.getConfiguration(); + conf2.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1); + try { + JobContext jContext2 = new JobContextImpl(conf2, + taskAttempt20.getJobID()); + TaskAttemptContext tContext2 = + new TaskAttemptContextImpl(conf2, taskAttempt20); + AbstractS3ACommitter committer2 = createCommitter(job2Dest, tContext2); + JobData jobData2 = new JobData(job2, jContext2, tContext2, committer2); + setup(jobData2); + abortInTeardown(jobData2); + // make sure the directories are different + assertEquals(job2Dest, committer2.getOutputPath()); + + // job2 setup, write some data there + writeTextOutput(tContext2); + + // at this point, job1 and job2 both have uncommitted tasks + + // commit tasks in order task 2, task 1. + committer2.commitTask(tContext2); + committer1.commitTask(tContext1); + + assertMultipartUploadsPending(job1Dest); + assertMultipartUploadsPending(job2Dest); + + // commit jobs in order job 1, job 2 + committer1.commitJob(jContext1); + assertNoMultipartUploadsPending(job1Dest); + getPart0000(job1Dest); + assertMultipartUploadsPending(job2Dest); + + committer2.commitJob(jContext2); + getPart0000(job2Dest); + assertNoMultipartUploadsPending(job2Dest); + } finally { + // uncommitted files to this path need to be deleted in tests which fail + abortMultipartUploadsUnderPath(job2Dest); + } + + } + + protected void validateTaskAttemptPathDuringWrite(Path p) throws IOException { + + } + + protected void validateTaskAttemptPathAfterWrite(Path p) throws IOException { + + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/CommitterFaultInjection.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/CommitterFaultInjection.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/CommitterFaultInjection.java new file mode 100644 index 0000000..7dd1b0a --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/CommitterFaultInjection.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit; + +/** + * Support for adding fault injection: all the failing committers in the IT + * tests must implement this. + */ +public interface CommitterFaultInjection { + String COMMIT_FAILURE_MESSAGE = "oops"; + + void setFaults(Faults... faults); + + /** + * Operations which can fail. + */ + enum Faults { + abortJob, + abortTask, + cleanupJob, + commitJob, + commitTask, + getWorkPath, + needsTaskCommit, + setupJob, + setupTask, + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/CommitterFaultInjectionImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/CommitterFaultInjectionImpl.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/CommitterFaultInjectionImpl.java new file mode 100644 index 0000000..d423398 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/CommitterFaultInjectionImpl.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter; + +/** + * Implementation of the fault injection lifecycle. + * Can reset a fault on failure or always raise it. + */ +public final class CommitterFaultInjectionImpl + extends PathOutputCommitter implements CommitterFaultInjection { + + private Set faults; + private boolean resetOnFailure; + + public CommitterFaultInjectionImpl(Path outputPath, + JobContext context, + boolean resetOnFailure, + Faults... faults) throws IOException { + super(outputPath, context); + setFaults(faults); + this.resetOnFailure = resetOnFailure; + } + + @Override + public void setFaults(Faults... faults) { + this.faults = new HashSet<>(faults.length); + Collections.addAll(this.faults, faults); + } + + /** + * Fail if the condition is in the set of faults, may optionally reset + * it before failing. + * @param condition condition to check for + * @throws Failure if the condition is faulting + */ + private void maybeFail(Faults condition) throws Failure { + if (faults.contains(condition)) { + if (resetOnFailure) { + faults.remove(condition); + } + throw new Failure(); + } + } + + @Override + public Path getWorkPath() throws IOException { + maybeFail(Faults.getWorkPath); + return null; + } + + @Override + public Path getOutputPath() { + return null; + } + + + @Override + public void setupJob(JobContext jobContext) throws IOException { + maybeFail(Faults.setupJob); + } + + @Override + public void setupTask(TaskAttemptContext taskContext) throws IOException { + maybeFail(Faults.setupTask); + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext taskContext) + throws IOException { + maybeFail(Faults.needsTaskCommit); + return false; + } + + @Override + public void commitTask(TaskAttemptContext taskContext) throws IOException { + maybeFail(Faults.commitTask); + } + + @Override + public void abortTask(TaskAttemptContext taskContext) throws IOException { + maybeFail(Faults.abortTask); + } + + @Override + public void commitJob(JobContext jobContext) throws IOException { + maybeFail(Faults.commitJob); + } + + @Override + public void abortJob(JobContext jobContext, JobStatus.State state) + throws IOException { + maybeFail(Faults.abortJob); + } + + /** + * The exception raised on failure. + */ + public static class Failure extends IOException { + public Failure() { + super(COMMIT_FAILURE_MESSAGE); + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java new file mode 100644 index 0000000..2a98382 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java @@ -0,0 +1,545 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.List; + +import com.amazonaws.services.s3.model.PartETag; +import org.junit.Assume; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; +import org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTracker; +import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter; +import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitterFactory; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter; +import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.*; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; +import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*; +import static org.apache.hadoop.fs.s3a.commit.MagicCommitPaths.*; +import static org.apache.hadoop.fs.s3a.Constants.*; +import static org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory.*; + +/** + * Test the low-level binding of the S3A FS to the magic commit mechanism, + * and handling of the commit operations. + * This is done with an inconsistent client. + */ +public class ITestCommitOperations extends AbstractCommitITest { + + private static final Logger LOG = + LoggerFactory.getLogger(ITestCommitOperations.class); + private static final byte[] DATASET = dataset(1000, 'a', 32); + private static final String S3A_FACTORY_KEY = String.format( + COMMITTER_FACTORY_SCHEME_PATTERN, "s3a"); + + /** + * A compile time flag which allows you to disable failure reset before + * assertions and teardown. + * As S3A is now required to be resilient to failure on all FS operations, + * setting it to false ensures that even the assertions are checking + * the resilience codepaths. + */ + private static final boolean RESET_FAILURES_ENABLED = false; + + private static final float HIGH_THROTTLE = 0.25f; + + private static final float FULL_THROTTLE = 1.0f; + + private static final int STANDARD_FAILURE_LIMIT = 2; + + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + bindCommitter(conf, CommitConstants.S3A_COMMITTER_FACTORY, + CommitConstants.COMMITTER_NAME_MAGIC); + return conf; + } + + @Override + public boolean useInconsistentClient() { + return true; + } + + @Override + public void setup() throws Exception { + FileSystem.closeAll(); + super.setup(); + verifyIsMagicCommitFS(getFileSystem()); + // abort,; rethrow on failure + setThrottling(HIGH_THROTTLE, STANDARD_FAILURE_LIMIT); + } + + @Test + public void testCreateTrackerNormalPath() throws Throwable { + S3AFileSystem fs = getFileSystem(); + MagicCommitIntegration integration + = new MagicCommitIntegration(fs, true); + String filename = "notdelayed.txt"; + Path destFile = methodPath(filename); + String origKey = fs.pathToKey(destFile); + PutTracker tracker = integration.createTracker(destFile, origKey); + assertFalse("wrong type: " + tracker + " for " + destFile, + tracker instanceof MagicCommitTracker); + } + + /** + * On a magic path, the magic tracker is returned. + * @throws Throwable failure + */ + @Test + public void testCreateTrackerMagicPath() throws Throwable { + S3AFileSystem fs = getFileSystem(); + MagicCommitIntegration integration + = new MagicCommitIntegration(fs, true); + String filename = "delayed.txt"; + Path destFile = methodPath(filename); + String origKey = fs.pathToKey(destFile); + Path pendingPath = makeMagic(destFile); + verifyIsMagicCommitPath(fs, pendingPath); + String pendingPathKey = fs.pathToKey(pendingPath); + assertTrue("wrong path of " + pendingPathKey, + pendingPathKey.endsWith(filename)); + final List elements = splitPathToElements(pendingPath); + assertEquals("splitPathToElements()", filename, lastElement(elements)); + List finalDestination = finalDestination(elements); + assertEquals("finalDestination()", + filename, + lastElement(finalDestination)); + final String destKey = elementsToKey(finalDestination); + assertEquals("destination key", origKey, destKey); + + PutTracker tracker = integration.createTracker(pendingPath, + pendingPathKey); + assertTrue("wrong type: " + tracker + " for " + pendingPathKey, + tracker instanceof MagicCommitTracker); + assertEquals("tracker destination key", origKey, tracker.getDestKey()); + + Path pendingSuffixedPath = new Path(pendingPath, + "part-0000" + PENDING_SUFFIX); + assertFalse("still a delayed complete path " + pendingSuffixedPath, + fs.isMagicCommitPath(pendingSuffixedPath)); + Path pendingSet = new Path(pendingPath, + "part-0000" + PENDINGSET_SUFFIX); + assertFalse("still a delayed complete path " + pendingSet, + fs.isMagicCommitPath(pendingSet)); + } + + @Test + public void testCreateAbortEmptyFile() throws Throwable { + describe("create then abort an empty file; throttled"); + S3AFileSystem fs = getFileSystem(); + String filename = "empty-abort.txt"; + Path destFile = methodPath(filename); + Path pendingFilePath = makeMagic(destFile); + touch(fs, pendingFilePath); + validateIntermediateAndFinalPaths(pendingFilePath, destFile); + Path pendingDataPath = validatePendingCommitData(filename, + pendingFilePath); + + CommitOperations actions = newCommitOperations(); + // abort,; rethrow on failure + fullThrottle(); + LOG.info("Abort call"); + actions.abortAllSinglePendingCommits(pendingDataPath.getParent(), true) + .maybeRethrow(); + resetFailures(); + assertPathDoesNotExist("pending file not deleted", pendingDataPath); + assertPathDoesNotExist("dest file was created", destFile); + } + + private void fullThrottle() { + setThrottling(FULL_THROTTLE, STANDARD_FAILURE_LIMIT); + } + + private CommitOperations newCommitOperations() { + return new CommitOperations(getFileSystem()); + } + + @Override + protected void resetFailures() { + if (!RESET_FAILURES_ENABLED) { + super.resetFailures(); + } + } + + /** + * Create a new path which has the same filename as the dest file, but + * is in a magic directory under the destination dir. + * @param destFile final destination file + * @return magic path + */ + private static Path makeMagic(Path destFile) { + return new Path(destFile.getParent(), + MAGIC + '/' + destFile.getName()); + } + + @Test + public void testCommitEmptyFile() throws Throwable { + describe("create then commit an empty file"); + createCommitAndVerify("empty-commit.txt", new byte[0]); + } + + @Test + public void testCommitSmallFile() throws Throwable { + describe("create then commit an empty file"); + createCommitAndVerify("small-commit.txt", DATASET); + } + + @Test + public void testAbortNonexistentDir() throws Throwable { + describe("Attempt to abort a directory that does not exist"); + Path destFile = methodPath("testAbortNonexistentPath"); + newCommitOperations() + .abortAllSinglePendingCommits(destFile, true) + .maybeRethrow(); + } + + @Test + public void testCommitterFactoryDefault() throws Throwable { + Configuration conf = new Configuration(); + Path dest = methodPath(); + conf.set(COMMITTER_FACTORY_CLASS, + MagicS3GuardCommitterFactory.CLASSNAME); + PathOutputCommitterFactory factory + = getCommitterFactory(dest, conf); + PathOutputCommitter committer = factory.createOutputCommitter( + methodPath(), + new TaskAttemptContextImpl(getConfiguration(), + new TaskAttemptID(new TaskID(), 1))); + assertEquals("Wrong committer", + MagicS3GuardCommitter.class, committer.getClass()); + } + + @Test + public void testCommitterFactorySchema() throws Throwable { + Configuration conf = new Configuration(); + Path dest = methodPath(); + + conf.set(S3A_FACTORY_KEY, + MagicS3GuardCommitterFactory.CLASSNAME); + PathOutputCommitterFactory factory + = getCommitterFactory(dest, conf); + // the casting is an implicit type check + MagicS3GuardCommitter s3a = (MagicS3GuardCommitter) + factory.createOutputCommitter( + methodPath(), + new TaskAttemptContextImpl(getConfiguration(), + new TaskAttemptID(new TaskID(), 1))); + // should never get here + assertNotNull(s3a); + } + + @Test + public void testBaseRelativePath() throws Throwable { + describe("Test creating file with a __base marker and verify that it ends" + + " up in where expected"); + Path destDir = methodPath("testBaseRelativePath"); + Path pendingBaseDir = new Path(destDir, MAGIC + "/child/" + BASE); + String child = "subdir/child.txt"; + Path pendingChildPath = new Path(pendingBaseDir, child); + Path expectedDestPath = new Path(destDir, child); + createFile(getFileSystem(), pendingChildPath, true, DATASET); + commit("child.txt", pendingChildPath, expectedDestPath, 0, 0); + } + + private void createCommitAndVerify(String filename, byte[] data) + throws Exception { + S3AFileSystem fs = getFileSystem(); + Path destFile = methodPath(filename); + Path magicDest = makeMagic(destFile); + try(FSDataOutputStream stream = fs.create(magicDest, true)) { + assertTrue(stream.hasCapability(STREAM_CAPABILITY_MAGIC_OUTPUT)); + if (data != null && data.length > 0) { + stream.write(data); + } + stream.close(); + } + FileStatus status = getFileStatusEventually(fs, magicDest, + CONSISTENCY_WAIT); + assertEquals("Non empty marker file: " + status, 0, status.getLen()); + + commit(filename, destFile, HIGH_THROTTLE, 0); + verifyFileContents(fs, destFile, data); + } + + /** + * Commit the file, with before and after checks on the dest and magic + * values. + * Failures can be set; they'll be reset after the commit. + * @param filename filename of file + * @param destFile destination path of file + * @param throttle probability of commit throttling + * @param failures failure limit + * @throws Exception any failure of the operation + */ + private void commit(String filename, + Path destFile, + float throttle, + int failures) throws Exception { + commit(filename, makeMagic(destFile), destFile, throttle, failures); + } + + /** + * Commit to a write to {@code magicFile} which is expected to + * be saved to {@code destFile}. + * Failures can be set; they'll be reset after the commit. + * @param magicFile path to write to + * @param destFile destination to verify + * @param throttle probability of commit throttling + * @param failures failure limit + */ + private void commit(String filename, + Path magicFile, + Path destFile, + float throttle, int failures) + throws IOException { + resetFailures(); + validateIntermediateAndFinalPaths(magicFile, destFile); + SinglePendingCommit commit = SinglePendingCommit.load(getFileSystem(), + validatePendingCommitData(filename, magicFile)); + CommitOperations actions = newCommitOperations(); + setThrottling(throttle, failures); + actions.commitOrFail(commit); + resetFailures(); + verifyCommitExists(commit); + } + + /** + * Perform any validation of paths. + * @param magicFilePath path to magic file + * @param destFile ultimate destination file + * @throws IOException IO failure + */ + private void validateIntermediateAndFinalPaths(Path magicFilePath, + Path destFile) + throws IOException { + assertPathDoesNotExist("dest file was created", destFile); + } + + /** + * Verify that the path at the end of a commit exists. + * This does not validate the size. + * @param commit commit to verify + * @throws FileNotFoundException dest doesn't exist + * @throws ValidationFailure commit arg is invalid + * @throws IOException invalid commit, IO failure + */ + private void verifyCommitExists(SinglePendingCommit commit) + throws FileNotFoundException, ValidationFailure, IOException { + commit.validate(); + // this will force an existence check + Path path = getFileSystem().keyToQualifiedPath(commit.getDestinationKey()); + FileStatus status = getFileSystem().getFileStatus(path); + LOG.debug("Destination entry: {}", status); + if (!status.isFile()) { + throw new PathCommitException(path, "Not a file: " + status); + } + } + + /** + * Validate that a pending commit data file exists, load it and validate + * its contents. + * @param filename short file name + * @param magicFile path that the file thinks that it was written to + * @return the path to the pending set + * @throws IOException IO problems + */ + private Path validatePendingCommitData(String filename, + Path magicFile) throws IOException { + S3AFileSystem fs = getFileSystem(); + Path pendingDataPath = new Path(magicFile.getParent(), + filename + PENDING_SUFFIX); + FileStatus fileStatus = verifyPathExists(fs, "no pending file", + pendingDataPath); + assertTrue("No data in " + fileStatus, fileStatus.getLen() > 0); + String data = read(fs, pendingDataPath); + LOG.info("Contents of {}: \n{}", pendingDataPath, data); + // really read it in and parse + SinglePendingCommit persisted = SinglePendingCommit.serializer() + .load(fs, pendingDataPath); + persisted.validate(); + assertTrue("created timestamp wrong in " + persisted, + persisted.getCreated() > 0); + assertTrue("saved timestamp wrong in " + persisted, + persisted.getSaved() > 0); + List etags = persisted.getEtags(); + assertEquals("etag list " + persisted, 1, etags.size()); + List partList = CommitOperations.toPartEtags(etags); + assertEquals("part list " + persisted, 1, partList.size()); + return pendingDataPath; + } + + /** + * Get a method-relative path. + * @param filename filename + * @return new path + * @throws IOException failure to create/parse the path. + */ + private Path methodPath(String filename) throws IOException { + return new Path(methodPath(), filename); + } + + /** + * Get a unique path for a method. + * @return a path + * @throws IOException + */ + protected Path methodPath() throws IOException { + return path(getMethodName()); + } + + @Test + public void testUploadEmptyFile() throws Throwable { + File tempFile = File.createTempFile("commit", ".txt"); + CommitOperations actions = newCommitOperations(); + Path dest = methodPath("testUploadEmptyFile"); + S3AFileSystem fs = getFileSystem(); + fs.delete(dest, false); + fullThrottle(); + + SinglePendingCommit pendingCommit = + actions.uploadFileToPendingCommit(tempFile, + dest, null, + DEFAULT_MULTIPART_SIZE); + resetFailures(); + assertPathDoesNotExist("pending commit", dest); + fullThrottle(); + actions.commitOrFail(pendingCommit); + resetFailures(); + FileStatus status = verifyPathExists(fs, + "uploaded file commit", dest); + assertEquals("File length in " + status, 0, status.getLen()); + } + + @Test + public void testUploadSmallFile() throws Throwable { + File tempFile = File.createTempFile("commit", ".txt"); + String text = "hello, world"; + FileUtils.write(tempFile, text, "UTF-8"); + CommitOperations actions = newCommitOperations(); + Path dest = methodPath("testUploadSmallFile"); + S3AFileSystem fs = getFileSystem(); + fullThrottle(); + SinglePendingCommit pendingCommit = + actions.uploadFileToPendingCommit(tempFile, + dest, null, + DEFAULT_MULTIPART_SIZE); + resetFailures(); + assertPathDoesNotExist("pending commit", dest); + fullThrottle(); + actions.commitOrFail(pendingCommit); + resetFailures(); + String s = readUTF8(fs, dest, -1); + assertEquals(text, s); + } + + @Test(expected = FileNotFoundException.class) + public void testUploadMissingFile() throws Throwable { + File tempFile = File.createTempFile("commit", ".txt"); + tempFile.delete(); + CommitOperations actions = newCommitOperations(); + Path dest = methodPath("testUploadMissingile"); + fullThrottle(); + actions.uploadFileToPendingCommit(tempFile, dest, null, + DEFAULT_MULTIPART_SIZE); + } + + @Test + public void testRevertCommit() throws Throwable { + Path destFile = methodPath("part-0000"); + S3AFileSystem fs = getFileSystem(); + touch(fs, destFile); + CommitOperations actions = newCommitOperations(); + SinglePendingCommit commit = new SinglePendingCommit(); + commit.setDestinationKey(fs.pathToKey(destFile)); + fullThrottle(); + actions.revertCommit(commit); + resetFailures(); + assertPathExists("parent of reverted commit", destFile.getParent()); + } + + @Test + public void testRevertMissingCommit() throws Throwable { + Path destFile = methodPath("part-0000"); + S3AFileSystem fs = getFileSystem(); + fs.delete(destFile, false); + CommitOperations actions = newCommitOperations(); + SinglePendingCommit commit = new SinglePendingCommit(); + commit.setDestinationKey(fs.pathToKey(destFile)); + fullThrottle(); + actions.revertCommit(commit); + assertPathExists("parent of reverted (nonexistent) commit", + destFile.getParent()); + } + + @Test + public void testFailuresInAbortListing() throws Throwable { + CommitOperations actions = newCommitOperations(); + Path path = path("testFailuresInAbort"); + getFileSystem().mkdirs(path); + setThrottling(HIGH_THROTTLE); + LOG.info("Aborting"); + actions.abortPendingUploadsUnderPath(path); + LOG.info("Abort completed"); + resetFailures(); + } + + + /** + * Test a normal stream still works as expected in a magic filesystem, + * with a call of {@code hasCapability()} to check that it is normal. + * @throws Throwable failure + */ + @Test + public void testWriteNormalStream() throws Throwable { + S3AFileSystem fs = getFileSystem(); + Assume.assumeTrue(fs.hasCapability(STREAM_CAPABILITY_MAGIC_OUTPUT)); + + Path destFile = path("normal"); + try (FSDataOutputStream out = fs.create(destFile, true)) { + out.writeChars("data"); + assertFalse("stream has magic output: " + out, + out.hasCapability(STREAM_CAPABILITY_MAGIC_OUTPUT)); + out.close(); + } + FileStatus status = getFileStatusEventually(fs, destFile, + CONSISTENCY_WAIT); + assertTrue("Empty marker file: " + status, status.getLen() > 0); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/LoggingTextOutputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/LoggingTextOutputFormat.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/LoggingTextOutputFormat.java new file mode 100644 index 0000000..1ac8038 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/LoggingTextOutputFormat.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit; + +import java.io.DataOutputStream; +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.util.ReflectionUtils; + +/** + * A subclass of {@link TextOutputFormat} which logs what is happening, and + * returns a {@link LoggingLineRecordWriter} which allows the caller + * to get the destination path. + * @param key + * @param value + */ +public class LoggingTextOutputFormat extends TextOutputFormat { + private static final Logger LOG = + LoggerFactory.getLogger(LoggingTextOutputFormat.class); + + public static final String NAME + = "org.apache.hadoop.fs.s3a.commit.LoggingTextOutputFormat"; + + @Override + public LoggingLineRecordWriter getRecordWriter(TaskAttemptContext job) + throws IOException, InterruptedException { + Configuration conf = job.getConfiguration(); + boolean isCompressed = getCompressOutput(job); + String keyValueSeparator = conf.get(SEPARATOR, "\t"); + CompressionCodec codec = null; + String extension = ""; + if (isCompressed) { + Class codecClass = + getOutputCompressorClass(job, GzipCodec.class); + codec = ReflectionUtils.newInstance(codecClass, conf); + extension = codec.getDefaultExtension(); + } + Path file = getDefaultWorkFile(job, extension); + FileSystem fs = file.getFileSystem(conf); + FSDataOutputStream fileOut = fs.create(file, false); + LOG.debug("Creating LineRecordWriter with destination {}", file); + if (isCompressed) { + return new LoggingLineRecordWriter<>( + file, new DataOutputStream(codec.createOutputStream(fileOut)), + keyValueSeparator); + } else { + return new LoggingLineRecordWriter<>(file, fileOut, keyValueSeparator); + } + } + + /** + * Write a line; counts the number of lines written and logs @ debug in the + * {@code close()} call. + * @param key + * @param value + */ + public static class LoggingLineRecordWriter + extends LineRecordWriter { + private final Path dest; + private long lines; + + public LoggingLineRecordWriter(Path dest, DataOutputStream out, + String keyValueSeparator) { + super(out, keyValueSeparator); + this.dest = dest; + } + + public LoggingLineRecordWriter(DataOutputStream out, Path dest) { + super(out); + this.dest = dest; + } + + @Override + public synchronized void write(K key, V value) throws IOException { + super.write(key, value); + lines++; + } + + public synchronized void close(TaskAttemptContext context) + throws IOException { + LOG.debug("Closing output file {} with {} lines :{}", + dest, lines, out); + out.close(); + } + + public Path getDest() { + return dest; + } + + public long getLines() { + return lines; + } + } + + /** + * Bind to a configuration for job submission. + * @param conf configuration + */ + public static void bind(Configuration conf) { + conf.setClass(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, + LoggingTextOutputFormat.class, + OutputFormat.class); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/MiniDFSClusterService.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/MiniDFSClusterService.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/MiniDFSClusterService.java new file mode 100644 index 0000000..7f689e0 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/MiniDFSClusterService.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.service.AbstractService; + +/** + * MiniDFS Cluster, encapsulated for use in different test suites. + */ +public class MiniDFSClusterService extends AbstractService { + + public MiniDFSClusterService() { + super("MiniDFSTestCluster"); + } + + private MiniDFSCluster cluster = null; + private FileSystem clusterFS = null; + private LocalFileSystem localFS = null; + + @Override + protected void serviceInit(Configuration conf) throws Exception { + conf.setBoolean("dfs.webhdfs.enabled", false); + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + Configuration conf = getConfig(); + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1) + .format(true) + .racks(null) + .build(); + clusterFS = cluster.getFileSystem(); + localFS = FileSystem.getLocal(clusterFS.getConf()); + } + + @Override + protected void serviceStop() throws Exception { + clusterFS = null; + localFS = null; + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + } + + public MiniDFSCluster getCluster() { + return cluster; + } + + public FileSystem getClusterFS() { + return clusterFS; + } + + public LocalFileSystem getLocalFS() { + return localFS; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org