Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 48C89200D27 for ; Wed, 25 Oct 2017 20:14:13 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 47334160BDA; Wed, 25 Oct 2017 18:14:13 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BA15B1609CE for ; Wed, 25 Oct 2017 20:14:11 +0200 (CEST) Received: (qmail 54006 invoked by uid 500); 25 Oct 2017 18:14:10 -0000 Mailing-List: contact common-issues-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-issues@hadoop.apache.org Received: (qmail 53994 invoked by uid 99); 25 Oct 2017 18:14:10 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 25 Oct 2017 18:14:10 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 0955C1A0A63 for ; Wed, 25 Oct 2017 18:14:10 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -100.002 X-Spam-Level: X-Spam-Status: No, score=-100.002 tagged_above=-999 required=6.31 tests=[RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id 4oK1xCgDvvJa for ; Wed, 25 Oct 2017 18:14:05 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id EC5BA61170 for ; Wed, 25 Oct 2017 18:14:03 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 87C4DE264E for ; Wed, 25 Oct 2017 18:14:02 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id F21A821317 for ; Wed, 25 Oct 2017 18:14:00 +0000 (UTC) Date: Wed, 25 Oct 2017 18:14:00 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: common-issues@hadoop.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (HADOOP-14971) Merge S3A committers into trunk MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Wed, 25 Oct 2017 18:14:13 -0000 [ https://issues.apache.org/jira/browse/HADOOP-14971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16219216#comment-16219216 ] ASF GitHub Bot commented on HADOOP-14971: ----------------------------------------- Github user steveloughran commented on a diff in the pull request: https://github.com/apache/hadoop/pull/282#discussion_r146941813 --- Diff: hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java --- @@ -0,0 +1,908 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.staging; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.PathIOException; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.commit.AbstractS3GuardCommitter; +import org.apache.hadoop.fs.s3a.commit.CommitConstants; +import org.apache.hadoop.fs.s3a.commit.DurationInfo; +import org.apache.hadoop.fs.s3a.commit.Tasks; +import org.apache.hadoop.fs.s3a.commit.files.PendingSet; +import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; + +import static com.google.common.base.Preconditions.*; +import static org.apache.hadoop.fs.s3a.Constants.*; +import static org.apache.hadoop.fs.s3a.S3AUtils.*; +import static org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.*; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; +import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*; + +/** + * Committer based on the contributed work of the + * Netflix multipart committers. + *
    + *
  1. + * The working directory of each task is actually under a temporary + * path in the local filesystem; jobs write directly into it. + *
  2. + *
  3. + * Task Commit: list all files under the task working dir, upload + * each of them but do not commit the final operation. + * Persist the information for each pending commit into the cluster + * for enumeration and commit by the job committer. + *
  4. + *
  5. Task Abort: recursive delete of task working dir.
  6. + *
  7. Job Commit: list all pending PUTs to commit; commit them.
  8. + *
  9. + * Job Abort: list all pending PUTs to commit; abort them. + * Delete all task attempt directories. + *
  10. + *
+ */ +public class StagingCommitter extends AbstractS3GuardCommitter { + + private static final Logger LOG = LoggerFactory.getLogger( + StagingCommitter.class); + public static final String NAME = "StagingCommitter"; + private final Path constructorOutputPath; + private final long uploadPartSize; + private final String uuid; + private final boolean uniqueFilenames; + private final FileOutputCommitter wrappedCommitter; + + private ConflictResolution conflictResolution; + private final Path finalOutputPath; + private String s3KeyPrefix = null; + + /** The directory in the cluster FS for commits to go to. */ + private Path commitsDirectory; + + /** + * Committer for a single task attempt. + * @param outputPath final output path + * @param context task context + * @throws IOException on a failure + */ + public StagingCommitter(Path outputPath, + TaskAttemptContext context) throws IOException { + super(outputPath, context); + this.constructorOutputPath = checkNotNull(getOutputPath(), "output path"); + Configuration conf = getConf(); + this.uploadPartSize = conf.getLongBytes( + MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE); + + // Spark will use a fake app ID based on the current minute and job ID 0. + // To avoid collisions, use the YARN application ID for Spark. + this.uuid = getUploadUUID(conf, context.getJobID()); + this.uniqueFilenames = conf.getBoolean( + FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES, + DEFAULT_STAGING_COMMITTER_UNIQUE_FILENAMES); + setWorkPath(buildWorkPath(context, uuid)); + this.wrappedCommitter = createWrappedCommitter(context, conf); + // forces evaluation and caching of the resolution mode. + ConflictResolution mode = getConflictResolutionMode(getJobContext()); + LOG.debug("Conflict resolution mode: {}", mode); + this.finalOutputPath = constructorOutputPath; + Preconditions.checkNotNull(finalOutputPath, "Output path cannot be null"); + S3AFileSystem fs = getS3AFileSystem(finalOutputPath, + context.getConfiguration(), false); + s3KeyPrefix = fs.pathToKey(finalOutputPath); + LOG.debug("{}: final output path is {}", getRole(), finalOutputPath); + setOutputPath(finalOutputPath); + } + + @Override + public String getName() { + return NAME; + } + + /** + * Create the wrapped committer. + * This includes customizing its options, and setting up the destination + * directory. + * @param context job/task context. + * @param conf config + * @return the inner committer + * @throws IOException on a failure + */ + protected FileOutputCommitter createWrappedCommitter(JobContext context, + Configuration conf) throws IOException { + + // explicitly choose commit algorithm + initFileOutputCommitterOptions(context); + commitsDirectory = Paths.getMultipartUploadCommitsDirectory(conf, uuid); + FileSystem stagingFS = commitsDirectory.getFileSystem(conf); + Path qualified = stagingFS.makeQualified(commitsDirectory); + if (stagingFS instanceof S3AFileSystem) { + // currently refuse to work with S3a for the working FS; you need + // a consistent FS. This isn't entirely true with s3guard and + // alternative S3 endpoints, but doing it now stops + // accidental use of S3 + throw new PathIOException(qualified.toUri().toString(), + "Directory for intermediate work cannot be on S3"); + } + return new FileOutputCommitter(qualified, context); + } + + /** + * Init the context config with everything needed for the file output + * committer. In particular, this code currently only works with + * commit algorithm 1. + * @param context context to configure. + */ + protected void initFileOutputCommitterOptions(JobContext context) { + context.getConfiguration() + .setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, 1); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("StagingCommitter{"); + sb.append(super.toString()); + sb.append(", finalOutputPath=").append(finalOutputPath); + sb.append(", conflictResolution=").append(conflictResolution); + if (wrappedCommitter != null) { + sb.append(", wrappedCommitter=").append(wrappedCommitter); + } + sb.append('}'); + return sb.toString(); + } + + /** + * Get the UUID of an upload; may be the job ID. + * @param conf job/task configuration + * @param jobId Job ID + * @return an ID for use in paths. + */ + public static String getUploadUUID(Configuration conf, String jobId) { + return conf.getTrimmed(FS_S3A_COMMITTER_STAGING_UUID, + conf.get(SPARK_WRITE_UUID, + conf.getTrimmed(SPARK_APP_ID, jobId))); + } + + /** + * Get the UUID of an upload; may be the job ID. + * @param conf job/task configuration + * @param jobId Job ID + * @return an ID for use in paths. + */ + public static String getUploadUUID(Configuration conf, JobID jobId) { + return getUploadUUID(conf, jobId.toString()); + } + + /** + * Get the work path for a task. + * @param context job/task complex + * @param uuid UUID + * @return a path or null if the context is not of a task + * @throws IOException failure to build the path + */ + private static Path buildWorkPath(JobContext context, String uuid) + throws IOException { + if (context instanceof TaskAttemptContext) { + return taskAttemptWorkingPath((TaskAttemptContext) context, uuid); + } else { + return null; + } + } + + /** + * The staging committers do not require "magic" commit support. + * @return false + */ + @Override + protected boolean isMagicFileSystemRequired() { + return false; + } + + /** + * Is this committer using unique filenames? + * @return true if unique filenames are used. + */ + public Boolean useUniqueFilenames() { + return uniqueFilenames; + } + + /** + * Get the filesystem for the job attempt. + * @param context the context of the job. This is used to get the + * application attempt ID. + * @return the FS to store job attempt data. + * @throws IOException failure to create the FS. + */ + public FileSystem getJobAttemptFileSystem(JobContext context) + throws IOException { + Path p = getJobAttemptPath(context); + return p.getFileSystem(context.getConfiguration()); + } + + /** + * Compute the path where the output of a given job attempt will be placed. + * @param context the context of the job. This is used to get the + * application attempt ID. + * @param out the output path to place these in. + * @return the path to store job attempt data. + */ + public static Path getJobAttemptPath(JobContext context, Path out) { + return getJobAttemptPath(getAppAttemptId(context), out); + } + + /** + * Compute the path where the output of a given job attempt will be placed. + * @param appAttemptId the ID of the application attempt for this job. + * @return the path to store job attempt data. + */ + private static Path getJobAttemptPath(int appAttemptId, Path out) { + return new Path(getPendingJobAttemptsPath(out), + String.valueOf(appAttemptId)); + } + + @Override + protected Path getJobAttemptPath(int appAttemptId) { + return new Path(getPendingJobAttemptsPath(commitsDirectory), + String.valueOf(appAttemptId)); + } + + /** + * Compute the path where the output of pending task attempts are stored. + * @param context the context of the job with pending tasks. + * @return the path where the output of pending task attempts are stored. + */ + private static Path getPendingTaskAttemptsPath(JobContext context, Path out) { + return new Path(getJobAttemptPath(context, out), + TEMPORARY); + } + + /** + * Compute the path where the output of a task attempt is stored until + * that task is committed. + * + * @param context the context of the task attempt. + * @param out The output path to put things in. + * @return the path where a task attempt should be stored. + */ + public static Path getTaskAttemptPath(TaskAttemptContext context, Path out) { + return new Path(getPendingTaskAttemptsPath(context, out), + String.valueOf(context.getTaskAttemptID())); + } + + /** + * Get the location of pending job attempts. + * @param out the base output directory. + * @return the location of pending job attempts. + */ + private static Path getPendingJobAttemptsPath(Path out) { + Preconditions.checkNotNull(out, "Null 'out' path"); + return new Path(out, TEMPORARY); + } + + /** + * Compute the path where the output of a committed task is stored until + * the entire job is committed. + * @param context the context of the task attempt + * @return the path where the output of a committed task is stored until + * the entire job is committed. + */ + public Path getCommittedTaskPath(TaskAttemptContext context) { + return getCommittedTaskPath(getAppAttemptId(context), context); + } + + /** + * Validate the task attempt context; makes sure + * that the task attempt ID data is valid. + * @param context task context + */ + private static void validateContext(TaskAttemptContext context) { + Preconditions.checkNotNull(context, "null context"); + Preconditions.checkNotNull(context.getTaskAttemptID(), + "null task attempt ID"); + Preconditions.checkNotNull(context.getTaskAttemptID().getTaskID(), + "null task ID"); + Preconditions.checkNotNull(context.getTaskAttemptID().getJobID(), + "null job ID"); + } + + /** + * Compute the path where the output of a committed task is stored until the + * entire job is committed for a specific application attempt. + * @param appAttemptId the ID of the application attempt to use + * @param context the context of any task. + * @return the path where the output of a committed task is stored. + */ + protected Path getCommittedTaskPath(int appAttemptId, + TaskAttemptContext context) { + validateContext(context); + return new Path(getJobAttemptPath(appAttemptId), + String.valueOf(context.getTaskAttemptID().getTaskID())); + } + + @Override + public Path getTempTaskAttemptPath(TaskAttemptContext context) { + throw new UnsupportedOperationException("Unimplemented"); + } + + /** + * Lists the output of a task under the task attempt path. Subclasses can + * override this method to change how output files are identified. + *

+ * This implementation lists the files that are direct children of the output + * path and filters hidden files (file names starting with '.' or '_'). + *

+ * The task attempt path is provided by + * {@link #getTaskAttemptPath(TaskAttemptContext)} + * + * @param context this task's {@link TaskAttemptContext} + * @return the output files produced by this task in the task attempt path + * @throws IOException on a failure + */ + protected List getTaskOutput(TaskAttemptContext context) + throws IOException { + PathFilter filter = Paths.HiddenPathFilter.get(); + + // get files on the local FS in the attempt path + Path attemptPath = getTaskAttemptPath(context); + Preconditions.checkNotNull(attemptPath, + "No attemptPath path in {}", this); + + LOG.debug("Scanning {} for files to commit", attemptPath); + + return flatmapLocatedFiles( + getTaskAttemptFilesystem(context) + .listFiles(attemptPath, true), + s -> maybe(filter.accept(s.getPath()), s)); + } + + /** + * Returns the final S3 key for a relative path. Subclasses can override this + * method to upload files to a different S3 location. + *

+ * This implementation concatenates the relative path with the key prefix + * from the output path. + * If {@link CommitConstants#FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES} is + * set, then the task UUID is also included in the calculation + * + * @param relative the path of a file relative to the task attempt path + * @param context the JobContext or TaskAttemptContext for this job + * @return the S3 key where the file will be uploaded + * @throws IOException on a failure + */ + protected String getFinalKey(String relative, JobContext context) + throws IOException { + if (uniqueFilenames) { + return getS3KeyPrefix(context) + "/" + Paths.addUUID(relative, uuid); + } else { + return getS3KeyPrefix(context) + "/" + relative; + } + } + + /** + * Returns the final S3 location for a relative path as a Hadoop {@link Path}. + * This is a final method that calls {@link #getFinalKey(String, JobContext)} + * to determine the final location. + * + * @param relative the path of a file relative to the task attempt path + * @param context the JobContext or TaskAttemptContext for this job + * @return the S3 Path where the file will be uploaded + * @throws IOException on a failure + */ + protected final Path getFinalPath(String relative, JobContext context) + throws IOException { + return getDestS3AFS().keyToQualifiedPath(getFinalKey(relative, context)); + } + + /** + * Return the local work path as the destination for writing work. + * @param context the context of the task attempt. + * @return a path in the local filesystem. + */ + @Override + public Path getBaseTaskAttemptPath(TaskAttemptContext context) { + // a path on the local FS for files that will be uploaded + return getWorkPath(); + } + + /** + * For a job attempt path, the staging committer returns that of the + * wrapped committer. + * @param context the context of the job. + * @return a path in HDFS. + */ + @Override + public Path getJobAttemptPath(JobContext context) { + return wrappedCommitter.getJobAttemptPath(context); + } + + /** + * Set up the job, including calling the same method on the + * wrapped committer. + * @param context job context + * @throws IOException IO failure. + */ + @Override + public void setupJob(JobContext context) throws IOException { + LOG.debug("{}, Setting up job {}", getRole(), jobIdString(context)); + context.getConfiguration().set(FS_S3A_COMMITTER_STAGING_UUID, uuid); + wrappedCommitter.setupJob(context); + } + + /** + * Get the list of pending uploads for this job attempt. + * @param context job context + * @return a list of pending uploads. + * @throws IOException Any IO failure + */ + protected List getPendingUploadsToCommit( + JobContext context) + throws IOException { + return listPendingUploads(context, false); + } + + /** + * Get the list of pending uploads for this job attempt, swallowing + * exceptions. + * @param context job context + * @return a list of pending uploads. If an exception was swallowed, + * then this may not match the actual set of pending operations + * @throws IOException shouldn't be raised, but retained for the compiler + */ + protected List listPendingUploadsToAbort( + JobContext context) throws IOException { + return listPendingUploads(context, true); + } + + /** + * Get the list of pending uploads for this job attempt. + * @param context job context + * @param suppressExceptions should exceptions be swallowed? + * @return a list of pending uploads. If exceptions are being swallowed, + * then this may not match the actual set of pending operations + * @throws IOException Any IO failure which wasn't swallowed. + */ + protected List listPendingUploads( + JobContext context, boolean suppressExceptions) throws IOException { + Path jobAttemptPath = wrappedCommitter.getJobAttemptPath(context); + final FileSystem attemptFS = jobAttemptPath.getFileSystem( + context.getConfiguration()); + FileStatus[] pendingCommitFiles; + try { + pendingCommitFiles = attemptFS.listStatus( + jobAttemptPath, Paths.HiddenPathFilter.get()); + } catch (FileNotFoundException e) { + // file is not present, raise without bothering to report + throw e; + } catch (IOException e) { + // unable to work with endpoint, if suppressing errors decide our actions + if (suppressExceptions) { + LOG.info("{} failed to list pending upload dir", getRole(), e); + return new ArrayList<>(0); + } else { + throw e; + } + } + return loadMultiplePendingCommitFiles(context, + suppressExceptions, attemptFS, pendingCommitFiles); + } + + /** + * Commit work. + * This consists of two stages: precommit and commit. + *

+ * Precommit: identify pending uploads, then allow subclasses + * to validate the state of the destination and the pending uploads. + * Any failure here triggers an abort of all pending uploads. + *

+ * Commit internal: do the final commit sequence. + *

+ * The final commit action is to build the {@code __SUCCESS} file entry. + *

+ * @param context job context + * @throws IOException any failure + */ + @Override + public void commitJob(JobContext context) throws IOException { + List pending = Collections.emptyList(); + try (DurationInfo d = new DurationInfo(LOG, + "%s: preparing to commit Job", getRole())) { + pending = getPendingUploadsToCommit(context); + preCommitJob(context, pending); + } catch (IOException e) { + LOG.warn("Precommit failure for job {}", jobIdString(context), e); + abortJobInternal(context, pending, true); + getCommitOperations().jobCompleted(false); + throw e; + } + try (DurationInfo d = new DurationInfo(LOG, + "%s: committing Job %s", getRole(), jobIdString(context))) { + commitJobInternal(context, pending); + } catch (IOException e) { + getCommitOperations().jobCompleted(false); + throw e; + } + getCommitOperations().jobCompleted(true); + maybeCreateSuccessMarkerFromCommits(context, pending); + } + + /** + * Subclass-specific pre commit actions. + * @param context job context + * @param pending the pending operations + * @throws IOException any failure + */ + protected void preCommitJob(JobContext context, + List pending) throws IOException { + } + + @Override + public void cleanupStagingDirs() throws IOException { + Path workPath = getWorkPath(); + if (workPath != null) { + LOG.debug("Cleaning up work path {}", workPath); + deleteQuietly(workPath.getFileSystem(getConf()), + workPath, true); + } + } + + /** + * Cleanup includes: deleting job attempt pending paths, + * local staging directories, and the directory of the wrapped committer. + * @param context job context + * @param suppressExceptions should exceptions be suppressed? + * @throws IOException IO failure. + */ + @Override + @SuppressWarnings("deprecation") + protected void cleanup(JobContext context, boolean suppressExceptions) + throws IOException { + try { + wrappedCommitter.cleanupJob(context); + deleteDestinationPaths(context); + cleanupStagingDirs(); + } catch (IOException e) { + if (suppressExceptions) { + LOG.error("{}: failed while cleaning up job", getRole(), e); + } else { + throw e; + } + } + } + + /** + * Delete the destination paths of a job. + * @param context job context + * @throws IOException IO failure + */ + protected void deleteDestinationPaths(JobContext context) throws IOException { + try { + deleteWithWarning(getJobAttemptFileSystem(context), + getJobAttemptPath(context), true); + } catch (IOException e) { + LOG.debug("{}: delete failure", getRole(), e); + } + + // delete the __temporary directory. This will cause problems + // if there is >1 task targeting the same dest dir + deleteWithWarning(getDestFS(), + new Path(getFinalOutputPath(), TEMPORARY), + true); + // and the working path + deleteTaskWorkingPathQuietly(context); + } + + + @Override + public void setupTask(TaskAttemptContext context) throws IOException { + Path taskAttemptPath = getTaskAttemptPath(context); + try (DurationInfo d = new DurationInfo(LOG, + "%s: setup task attempt path %s ", getRole(), taskAttemptPath)) { + // create the local FS + taskAttemptPath.getFileSystem(getConf()).mkdirs(taskAttemptPath); + wrappedCommitter.setupTask(context); + } + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext context) + throws IOException { + try (DurationInfo d = new DurationInfo(LOG, + "%s: needsTaskCommit() Task %s", getRole(), context.getTaskAttemptID())) { + // check for files on the local FS in the attempt path + Path attemptPath = getTaskAttemptPath(context); + FileSystem fs = getTaskAttemptFilesystem(context); + + // This could be made more efficient with a probe "hasChildren(Path)" + // which returns true if there is >1 entry under a given path. + FileStatus[] stats = fs.listStatus(attemptPath); + LOG.debug("{} files to commit under {}", stats.length, attemptPath); + return stats.length > 0; + } catch (FileNotFoundException e) { + // list didn't find a directory, so nothing to commit + // TODO: throw this up as an error? + LOG.info("No files to commit"); + throw e; + } + } + + @Override + public void commitTask(TaskAttemptContext context) throws IOException { + try (DurationInfo d = new DurationInfo(LOG, + "%s: commit task %s", getRole(), context.getTaskAttemptID())) { + int count = commitTaskInternal(context, getTaskOutput(context)); + LOG.info("{}: upload file count: {}", getRole(), count); + } catch (IOException e) { + LOG.error("{}: commit of task {} failed", + getRole(), context.getTaskAttemptID(), e); + getCommitOperations().taskCompleted(false); + throw e; + } + getCommitOperations().taskCompleted(true); + } + + /** + * Commit the task by uploading all created files and then + * writing a pending entry for them. + * @param context task context + * @param taskOutput list of files from the output + * @return number of uploads committed. + * @throws IOException IO Failures. + */ + protected int commitTaskInternal(final TaskAttemptContext context, + List taskOutput) + throws IOException { + LOG.debug("{}: commitTaskInternal", getRole()); + Configuration conf = context.getConfiguration(); + + final Path attemptPath = getTaskAttemptPath(context); + FileSystem attemptFS = getTaskAttemptFilesystem(context); + LOG.debug("{}: attempt path is {}", getRole(), attemptPath); + + // add the commits file to the wrapped committer's task attempt location. + // of this method. + Path commitsAttemptPath = wrappedCommitter.getTaskAttemptPath(context); + FileSystem commitsFS = commitsAttemptPath.getFileSystem(conf); + + // keep track of unfinished commits in case one fails. if something fails, + // we will try to abort the ones that had already succeeded. + int commitCount = taskOutput.size(); + final Queue commits = new ConcurrentLinkedQueue<>(); + LOG.info("{}: uploading from staging directory to S3", getRole()); + LOG.info("{}: Saving pending data information to {}", + getRole(), commitsAttemptPath); + if (taskOutput.isEmpty()) { + // there is nothing to write. needsTaskCommit() should have caught + // this, so warn that there is some kind of problem in the protocol. + LOG.warn("{}: No files to commit", getRole()); + } else { + boolean threw = true; + // before the uploads, report some progress + context.progress(); + + PendingSet pendingCommits = new PendingSet(commitCount); + try { + Tasks.foreach(taskOutput) + .stopOnFailure() + .throwFailureWhenFinished() --- End diff -- done > Merge S3A committers into trunk > ------------------------------- > > Key: HADOOP-14971 > URL: https://issues.apache.org/jira/browse/HADOOP-14971 > Project: Hadoop Common > Issue Type: Sub-task > Components: fs/s3 > Affects Versions: 3.0.0 > Reporter: Steve Loughran > Assignee: Steve Loughran > > Merge the HADOOP-13786 committer into trunk. This branch is being set up as a github PR for review there & to keep it out the mailboxes of the watchers on the main JIRA -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-issues-help@hadoop.apache.org