hadoop-common-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Aaron Fabbri (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (HADOOP-13786) Add S3Guard committer for zero-rename commits to consistent S3 endpoints
Date Tue, 09 May 2017 19:34:04 GMT

    [ https://issues.apache.org/jira/browse/HADOOP-13786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16003372#comment-16003372
] 

Aaron Fabbri commented on HADOOP-13786:
---------------------------------------

Been spending some time reviewing the patch.  It is pretty large, so I haven't covered 100%,
but here are some comments so far:

{code}
+ * This method was copied from
+ * {@code org.apache.hadoop.registry.client.binding.JsonSerDeser}.
+ * @param <T> Type to marshal.
+ */
{code}

Can we move the dependency to a common place instead of copying?

{code}
/**
+ * A factory for committers implementing the {@link PathOutputCommitter}
+ * methods, and so can be used from {@link FileOutputFormat}.
+ * The base implementation returns {@link FileOutputFormat} instances.
+ */
+public class PathOutputCommitterFactory extends Configured {
{code}
In the comment, did you mean "FileOutputCommitter instances"?

{code}
           bytes = putObject();
         }
       } else {
-        // there has already been at least one block scheduled for upload;
-        // put up the current then wait
-        if (hasBlock && block.hasData()) {
+        // there's an MPU in progress';
+        // IF there is more data to upload, or no data has yet been uploaded,
+        // PUT the final block
+        if (hasBlock &&
+            (block.hasData() || multiPartUpload.getPartsSubmitted() == 0)) {
{code}
This to allow for zero byte files, right?  Looks like you had to make similar changes elsewhere.
 Did you cover this new behavior with a test case?

{code}
+  public void abortOutstandingMultipartUploads(long seconds)
+      throws IOException {
+    Preconditions.checkArgument(seconds >=0);
+    Date purgeBefore =
+        new Date(new Date().getTime() - seconds * 1000);
+    LOG.debug("Purging outstanding multipart uploads older than {}",
+        purgeBefore);
+    try {
+      transfers.abortMultipartUploads(bucket, purgeBefore);
+    } catch (AmazonServiceException e) {
+      throw translateException("purging multipart uploads", bucket, e);
+    }
{code}
Thinking of how multipart purge interacts with magic committer.

Currently someone can config fs.s3a.multipart.purge.age to 0.  I'm wondering if
we shouldn't increase the minimum or log a warning when magic committer is also
enabled?  Maybe it at least deserves some special treatment in the docs?

{code}
    * Returns the S3 client used by this filesystem.
    * @return AmazonS3Client
    */
-  AmazonS3 getAmazonS3Client() {
+  public AmazonS3 getAmazonS3Client() {
     return s3;
   }
{code}
Not @VisibleForTesting?   Looks like you just want to subclass/override. Can we make it protected
instead?

{code}
    * @param key input key
    * @return the fully qualified path including URI scheme and bucket name.
    */
-  Path keyToQualifiedPath(String key) {
+  public Path keyToQualifiedPath(String key) {
{code}

Should we annotate the interface stability or audience here?

{code}
      * @return the upload result containing the ID
      * @throws IOException IO problem
      */
-    String initiateMultiPartUpload() throws IOException {
+    public String initiateMultiPartUpload() throws IOException {
       LOG.debug("Initiating Multipart upload");
       final InitiateMultipartUploadRequest initiateMPURequest =
           new InitiateMultipartUploadRequest(bucket,
{code}
Audience annotation?

{code}
+    /**
+     * Finalize a multipart PUT operation.
+     * This completes the upload, and, if that works, calls
+     * {@link #finishedWrite(String, long)} to update the filesystem.
+     * @param uploadId multipart operation Id
+     * @param partETags list of partial uploads
+     * @return the result of the operation.
+     * @throws IOException on problems.
+     */
{code}
Semi-related: I've been thinking of splitting up S3AFileSystem in the
future.  Seems like we could separate WriteOperationHelper, and maybe put
other S3-level operations (i.e. below anything that cares about MetadataStore)
into a separate class.

{code}
+    public int abortMultipartUploadsUnderPath(String prefix)
+        throws IOException {
+      int count = 0;
+      for (MultipartUpload upload : listMultipartUploads(prefix)) {
+        try {
+          abortMultipartCommit(upload);
+          count++;
+        } catch (FileNotFoundException e) {
+          LOG.debug("Already aborted: {}", upload.getKey(), e);
{code}
I wonder if we'll need to deal with retryable exceptions here, instead of
propagating.

{code}
+     * @return the upload initiated
+     * @throws IOException on problems
+     */
+    public PutObjectResult putObjectAndFinalize(
+        PutObjectRequest putObjectRequest,
+        int length) throws IOException {
+      PutObjectResult result = putObject(putObjectRequest);
+      finishedWrite(putObjectRequest.getKey(), length);
{code}

Also wondering about error handling / retries here.  I don't know if there
is a difference between direct put and TransferManager in terms of built-in
retry logic.

{code}
+    /**
+     * Revert a commit by deleting the file.
+     * TODO: Policy regarding creating a mock empty parent directory.
+     * @param destKey destination key
+     * @throws IOException due to inability to delete a directory or file.
+     */
+    public void revertCommit(String destKey) throws IOException {
{code}

Does the directory need to exist after revert?  Can we just say it doesn't?

{code}
+  /**
+   * Delete a path quietly: failures are logged at DEBUG.
+   * @param fs filesystem
+   * @param path path
+   * @param recursive recursive?
+   */
+  public static void deleteWithWarning(FileSystem fs,
{code}
/quietly: failures are logged at DEBUG/: failures are logged at WARN/

{code}
+public abstract class Abstract3GuardCommitterFactory
{code}
"AbstractS3GuardCommitterFactory" (Missing the 'S')

{code}
+  public static List<String> splitPathToElements(Path path) {
+    String uriPath = path.toUri().getPath();
+    checkArgument(!uriPath.isEmpty(), "empty path");
+    checkArgument(uriPath.charAt(0) == '/', "path is relative");
+    if ("/".equals(uriPath)) {
+      // special case: empty list
+      return new ArrayList<>(0);
+    }
+    path.depth();
{code}
Meant to delete this last line?

{code}
+  /**
+   * Verify that that path is a delayed commit path.
{code}
nit: /that/the/ or /that//

{code}
+
+/**
+ * Adds the code needed for S3A integration.
+ * It's pulled out to keep S3A FS class slightly less complex.
{code}
Thanks for the separation here.

{code}
+ * Configuration options:
+ * <pre>
+ *   : temporary local FS directory
+ *   : intermediate directory on a cluster-wide FS (can be HDFS or a consistent
+ *   s3 endpoint).
{code}
Couldn't parse this comment.  Is there supposed to be a config name before the ':'?

{code}
+  public void testConcurrentCommitTaskWithSubDir() throws Exception {
+    Job job = newJob();
+    FileOutputFormat.setOutputPath(job, outDir);
+    final Configuration conf = job.getConfiguration();
+/*
+
+    conf.setClass("fs.file.impl", RLFS.class, FileSystem.class);
+    FileSystem.closeAll();
+*/
{code}
There are a couple of spots with commented-out code and TODOs still.  Note to clean these
up.

{code}
+    // if this fails with "directory is already locked" set umask to 0022
+    cluster = new MiniDFSCluster(conf, 1, true, null);
+    //cluster = new MiniDFSCluster.Builder(new Configuration()).build();
{code}
ditto.

{code}
+/*
+  @Override
+  protected void deleteObject(String key) throws InvalidRequestException {
+    // force into sharing the existing mock entry point
+    getAmazonS3Client().deleteObject(new DeleteObjectRequest(getBucket(), key));
+  }
+*/
{code}
same.

{code}
+
+    @Override
+    public void setFaults(Faults... faults) {
+      injection.setFaults(faults);
{code}
 I only saw this used once in AbstractITCommitProtocol which sets a
commitJob failure.  Curious to hear your plans on more fault injection.  Hope we can discuss
in a call soon.

Finally, a reminder that your log4j.properties diff will need to be commented out or removed
eventually.  The settings here are useful for testing and dev though.


> Add S3Guard committer for zero-rename commits to consistent S3 endpoints
> ------------------------------------------------------------------------
>
>                 Key: HADOOP-13786
>                 URL: https://issues.apache.org/jira/browse/HADOOP-13786
>             Project: Hadoop Common
>          Issue Type: New Feature
>          Components: fs/s3
>    Affects Versions: HADOOP-13345
>            Reporter: Steve Loughran
>            Assignee: Steve Loughran
>         Attachments: HADOOP-13786-HADOOP-13345-001.patch, HADOOP-13786-HADOOP-13345-002.patch,
HADOOP-13786-HADOOP-13345-003.patch, HADOOP-13786-HADOOP-13345-004.patch, HADOOP-13786-HADOOP-13345-005.patch,
HADOOP-13786-HADOOP-13345-006.patch, HADOOP-13786-HADOOP-13345-006.patch, HADOOP-13786-HADOOP-13345-007.patch,
HADOOP-13786-HADOOP-13345-009.patch, HADOOP-13786-HADOOP-13345-010.patch, HADOOP-13786-HADOOP-13345-011.patch,
HADOOP-13786-HADOOP-13345-012.patch, HADOOP-13786-HADOOP-13345-013.patch, HADOOP-13786-HADOOP-13345-015.patch,
HADOOP-13786-HADOOP-13345-016.patch, HADOOP-13786-HADOOP-13345-017.patch, HADOOP-13786-HADOOP-13345-018.patch,
HADOOP-13786-HADOOP-13345-019.patch, HADOOP-13786-HADOOP-13345-020.patch, HADOOP-13786-HADOOP-13345-021.patch,
HADOOP-13786-HADOOP-13345-022.patch, HADOOP-13786-HADOOP-13345-023.patch, HADOOP-13786-HADOOP-13345-024.patch,
HADOOP-13786-HADOOP-13345-025.patch, objectstore.pdf, s3committer-master.zip
>
>
> A goal of this code is "support O(1) commits to S3 repositories in the presence of failures".
Implement it, including whatever is needed to demonstrate the correctness of the algorithm.
(that is, assuming that s3guard provides a consistent view of the presence/absence of blobs,
show that we can commit directly).
> I consider ourselves free to expose the blobstore-ness of the s3 output streams (ie.
not visible until the close()), if we need to use that to allow us to abort commit operations.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org


Mime
View raw message