Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 113BB200D27 for ; Wed, 25 Oct 2017 18:28:07 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 0F89E1609E5; Wed, 25 Oct 2017 16:28:07 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 0931D1609DD for ; Wed, 25 Oct 2017 18:28:05 +0200 (CEST) Received: (qmail 95157 invoked by uid 500); 25 Oct 2017 16:28:05 -0000 Mailing-List: contact common-issues-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-issues@hadoop.apache.org Received: (qmail 95146 invoked by uid 99); 25 Oct 2017 16:28:05 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 25 Oct 2017 16:28:05 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 574F5180787 for ; Wed, 25 Oct 2017 16:28:04 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -100.002 X-Spam-Level: X-Spam-Status: No, score=-100.002 tagged_above=-999 required=6.31 tests=[RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id rPNhufwZc_iT for ; Wed, 25 Oct 2017 16:28:02 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id 4AF5D5FB4E for ; Wed, 25 Oct 2017 16:28:01 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 80BB2E0373 for ; Wed, 25 Oct 2017 16:28:00 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 3CE22212F5 for ; Wed, 25 Oct 2017 16:28:00 +0000 (UTC) Date: Wed, 25 Oct 2017 16:28:00 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: common-issues@hadoop.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (HADOOP-14971) Merge S3A committers into trunk MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Wed, 25 Oct 2017 16:28:07 -0000 [ https://issues.apache.org/jira/browse/HADOOP-14971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16218999#comment-16218999 ] ASF GitHub Bot commented on HADOOP-14971: ----------------------------------------- Github user steveloughran commented on a diff in the pull request: https://github.com/apache/hadoop/pull/282#discussion_r146912753 --- Diff: hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/Paths.java --- @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.staging; + +import java.io.IOException; +import java.net.URI; +import java.util.List; +import java.util.Random; +import java.util.Set; + +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.PathIsDirectoryException; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.security.UserGroupInformation; + +import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; +import static org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.*; + +/** + * Path operations for the staging committers. + */ +public final class Paths { + + private Paths() { + } + + /** + * Insert the UUID to a path if it is not there already. + * If there is a trailing "." in the prefix after the last slash, the + * UUID is inserted before it with a "-" prefix; otherwise appended. + * + * Examples: + *
    +   *   /example/part-0000  ==> /example/part-0000-0ab34
    +   *   /example/part-0001.gz.csv  ==> /example/part-0001-0ab34.gz.csv
    +   *   /example/part-0002-0abc3.gz.csv  ==> /example/part-0002-0abc3.gz.csv
    +   *   /example0abc3/part-0002.gz.csv  ==> /example0abc3/part-0002.gz.csv
    +   * 
+ * + * + * @param pathStr path as a string; must not have a trailing "/". + * @param uuid UUID to append; must not be empty + * @return new path. + */ + public static String addUUID(String pathStr, String uuid) { + Preconditions.checkArgument(StringUtils.isNotEmpty(pathStr), "empty path"); + Preconditions.checkArgument(StringUtils.isNotEmpty(uuid), "empty uuid"); + // In some cases, Spark will add the UUID to the filename itself. + if (pathStr.contains(uuid)) { + return pathStr; + } + + int dot; // location of the first '.' in the file name + int lastSlash = pathStr.lastIndexOf('/'); + if (lastSlash >= 0) { + Preconditions.checkState(lastSlash + 1 < pathStr.length(), + "Bad path: " + pathStr); + dot = pathStr.indexOf('.', lastSlash); + } else { + dot = pathStr.indexOf('.'); + } + + if (dot >= 0) { + return pathStr.substring(0, dot) + "-" + uuid + pathStr.substring(dot); + } else { + return pathStr + "-" + uuid; + } + } + + /** + * Get the parent path of a string path: everything up to but excluding + * the last "/" in the path. + * @param pathStr path as a string + * @return the parent or null if there is no parent. + */ + public static String getParent(String pathStr) { + int lastSlash = pathStr.lastIndexOf('/'); + if (lastSlash >= 0) { + return pathStr.substring(0, lastSlash); + } + return null; + } + + /** + * Using {@code URI#relativize()}, build the relative path from the + * base path to the full path. + * If {@code childPath} is not a child of {@code basePath} the outcome + * os undefined. + * @param basePath base path + * @param childPath full path under the base path. + * @return the relative path + */ + public static String getRelativePath(Path basePath, + Path childPath) { + // + // Use URI.create(Path#toString) to avoid URI character escape bugs + URI relative = URI.create(basePath.toString()) + .relativize(URI.create(childPath.toString())); + return relative.getPath(); + } + + + /** + * Varags constructor of paths. Not very efficient. + * @param parent parent path + * @param child child entries. "" elements are skipped. + * @return the full child path. + */ + public static Path path(Path parent, String... child) { + Path p = parent; + for (String c : child) { + if (!c.isEmpty()) { + p = new Path(p, c); + } + } + return p; + } + + /** + * Get the task attempt temporary directory in the local filesystem. + * @param conf configuration + * @param uuid some UUID, such as a job UUID + * @param attempt attempt ID + * @return a local task attempt directory. + * @throws IOException IO problem. + */ + public static Path getLocalTaskAttemptTempDir(Configuration conf, + String uuid, TaskAttemptID attempt) throws IOException { + int taskId = attempt.getTaskID().getId(); + int attemptId = attempt.getId(); + return path(localTemp(conf, taskId, attemptId), + uuid, + Integer.toString(getAppAttemptId(conf)), + attempt.toString()); + } + + /** + * Try to come up with a good temp directory for different filesystems. + * @param fs filesystem + * @param conf configuration + * @return a path under which temporary work can go. + */ + public static Path tempDirForStaging(FileSystem fs, + Configuration conf) { + Path temp; + switch (fs.getScheme()) { + case "file": + temp = fs.makeQualified( + new Path(System.getProperty(JAVA_IO_TMPDIR))); + break; + + case "s3a": + // the Staging committer may reject this if it doesn't believe S3A + // is consistent. + temp = fs.makeQualified(new Path(FILESYSTEM_TEMP_PATH)); + break; + + // here assume that /tmp is valid + case "hdfs": + default: + String pathname = conf.getTrimmed( + FS_S3A_COMMITTER_STAGING_TMP_PATH, FILESYSTEM_TEMP_PATH); + temp = fs.makeQualified(new Path(pathname)); + } + return temp; + } + + /** + * Get the Application Attempt ID for this job. + * @param conf the config to look in + * @return the Application Attempt ID for a given job. + */ + private static int getAppAttemptId(Configuration conf) { + return conf.getInt( + MRJobConfig.APPLICATION_ATTEMPT_ID, 0); + } + + /** + * Build a temporary path for the multipart upload commit information + * in the supplied filesystem (which is expected to be the cluster FS) + * @param conf configuration defining default FS. + * @param uuid uuid of job + * @return a path which can be used for temporary work + * @throws IOException on an IO failure. + */ + public static Path getMultipartUploadCommitsDirectory(Configuration conf, + String uuid) throws IOException { + FileSystem fs = FileSystem.get(conf); + return getMultipartUploadCommitsDirectory(fs, conf, uuid); + } + + /** + * Build a temporary path for the multipart upload commit information + * in the supplied filesystem (which is expected to be the cluster FS) + * @param fs target FS + * @param conf configuration + * @param uuid uuid of job + * @return a path which can be used for temporary work + * @throws IOException on an IO failure. + */ + static Path getMultipartUploadCommitsDirectory(FileSystem fs, + Configuration conf, String uuid) throws IOException { + return path(tempDirForStaging(fs, conf), + UserGroupInformation.getCurrentUser().getShortUserName(), + uuid, + STAGING_UPLOADS); + } + + // TODO: verify this is correct, it comes from dse-storage + private static Path localTemp(Configuration conf, int taskId, int attemptId) + throws IOException { + String[] dirs = conf.getTrimmedStrings(BUFFER_DIR); + Random rand = new Random(Objects.hashCode(taskId, attemptId)); + String dir = dirs[rand.nextInt(dirs.length)]; + + return FileSystem.getLocal(conf).makeQualified(new Path(dir)); + } + + /** + * Returns the partition of a relative file path, or null if the path is a + * file name with no relative directory. + * + * @param relative a relative file path + * @return the partition of the relative file path + */ + protected static String getPartition(String relative) { + return getParent(relative); + } + + /** + * Get the set of partitions from the list of files being staged. + * This is all immediate parents of those files. If a file is in the root + * dir, the partition is declared to be + * {@link StagingCommitterConstants#TABLE_ROOT}. + * @param attemptPath path for the attempt + * @param taskOutput list of output files. + * @return list of partitions. + * @throws IOException IO failure + */ + public static Set getPartitions(Path attemptPath, + List taskOutput) + throws IOException { + // get a list of partition directories + Set partitions = Sets.newLinkedHashSet(); + for (FileStatus fileStatus : taskOutput) { + // sanity check the output paths + Path outputFile = fileStatus.getPath(); + if (!fileStatus.isFile()) { + throw new PathIsDirectoryException(outputFile.toString()); + } + String partition = getPartition( + getRelativePath(attemptPath, outputFile)); + partitions.add(partition != null ? partition : TABLE_ROOT); + } + + return partitions; + } + + /** + * path filter. + */ + static final class HiddenPathFilter implements PathFilter { --- End diff -- its ubiquitous, isn't it? And in java 8, we can reimplement as a closure... > Merge S3A committers into trunk > ------------------------------- > > Key: HADOOP-14971 > URL: https://issues.apache.org/jira/browse/HADOOP-14971 > Project: Hadoop Common > Issue Type: Sub-task > Components: fs/s3 > Affects Versions: 3.0.0 > Reporter: Steve Loughran > Assignee: Steve Loughran > > Merge the HADOOP-13786 committer into trunk. This branch is being set up as a github PR for review there & to keep it out the mailboxes of the watchers on the main JIRA -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-issues-help@hadoop.apache.org