nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (NIFI-1107) Create new PutS3ObjectMultipart processor
Date Mon, 16 Nov 2015 19:11:11 GMT

    [ https://issues.apache.org/jira/browse/NIFI-1107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15007137#comment-15007137
] 

ASF GitHub Bot commented on NIFI-1107:
--------------------------------------

Github user jskora commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/121#discussion_r44967382
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3ObjectMultipart.java
---
    @@ -0,0 +1,550 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.s3;
    +
    +import com.amazonaws.AmazonClientException;
    +import com.amazonaws.services.s3.AmazonS3;
    +import com.amazonaws.services.s3.AmazonS3Client;
    +import com.amazonaws.services.s3.model.AccessControlList;
    +import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
    +import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
    +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
    +import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
    +import com.amazonaws.services.s3.model.ObjectMetadata;
    +import com.amazonaws.services.s3.model.PartETag;
    +import com.amazonaws.services.s3.model.StorageClass;
    +import com.amazonaws.services.s3.model.UploadPartRequest;
    +import com.amazonaws.services.s3.model.UploadPartResult;
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.BufferedInputStream;
    +
    +import java.io.File;
    +import java.io.FileInputStream;
    +import java.io.FileOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +import java.util.concurrent.TimeUnit;
    +
    +@SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class})
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"Amazon", "S3", "AWS", "Archive", "Put", "Multi", "Multipart", "Upload"})
    +@CapabilityDescription("Puts FlowFiles to an Amazon S3 Bucket using the MultipartUpload
API method.  " +
    +        "This upload consists of three steps 1) initiate upload, 2) upload the parts,
and 3) complete the upload.\n" +
    +        "Since the intent for this processor involves large files, the processor saves
state locally after each step " +
    +        "so that an upload can be resumed without having to restart from the beginning
of the file.\n" +
    +        "The AWS libraries default to using standard AWS regions but the 'Endpoint Override
URL' allows this to be " +
    +        "overridden.")
    +@DynamicProperty(name = "The name of a User-Defined Metadata field to add to the S3 Object",
    +        value = "The value of a User-Defined Metadata field to add to the S3 Object",
    +        description = "Allows user-defined metadata to be added to the S3 object as key/value
pairs",
    +        supportsExpressionLanguage = true)
    +@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's filename as
the filename for the S3 object")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "s3.bucket", description = "The S3 bucket where
the Object was put in S3"),
    +        @WritesAttribute(attribute = "s3.key", description = "The S3 key within where
the Object was put in S3"),
    +        @WritesAttribute(attribute = "s3.version", description = "The version of the
S3 Object that was put to S3"),
    +        @WritesAttribute(attribute = "s3.etag", description = "The ETag of the S3 Object"),
    +        @WritesAttribute(attribute = "s3.uploadId", description = "The uploadId used
to upload the Object to S3"),
    +        @WritesAttribute(attribute = "s3.expiration", description = "A human-readable
form of the expiration date of " +
    +                "the S3 object, if one is set"),
    +        @WritesAttribute(attribute = "s3.usermetadata", description = "A human-readable
form of the User Metadata " +
    +                "of the S3 object, if any was set")
    +})
    +public class PutS3ObjectMultipart extends AbstractS3Processor {
    +
    +    public static final long MIN_BYTES_INCLUSIVE = 50L * 1024L * 1024L;
    +    public static final long MAX_BYTES_INCLUSIVE = 5L * 1024L * 1024L * 1024L;
    +    public static final String PERSISTENCE_ROOT = "conf/state/";
    +
    +    public static final PropertyDescriptor EXPIRATION_RULE_ID = new PropertyDescriptor.Builder()
    +            .name("Expiration Time Rule")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor STORAGE_CLASS = new PropertyDescriptor.Builder()
    +            .name("Storage Class")
    +            .required(true)
    +            .allowableValues(StorageClass.Standard.name(), StorageClass.ReducedRedundancy.name())
    +            .defaultValue(StorageClass.Standard.name())
    +            .build();
    +
    +    public static final PropertyDescriptor PART_SIZE = new PropertyDescriptor.Builder()
    +            .name("Part Size")
    +            .description("Specifies the Part Size to be used for the S3 Multipart Upload
API.  The flow file will be " +
    +                    "broken into Part Size chunks during upload.  Part size must be at
least 50MB and no more than " +
    +                    "5GB, but the final part can be less than 50MB.")
    +            .required(true)
    +            .defaultValue("5 GB")
    +            .addValidator(StandardValidators.createDataSizeBoundsValidator(MIN_BYTES_INCLUSIVE,
MAX_BYTES_INCLUSIVE))
    +            .build();
    +
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(KEY, BUCKET, PART_SIZE, ENDPOINT_OVERRIDE, ACCESS_KEY, SECRET_KEY,
CREDENTIALS_FILE,
    +                    SSL_CONTEXT_SERVICE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
    +                    FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST,
WRITE_ACL_LIST, OWNER));
    +
    +    final static String S3_BUCKET_KEY = "s3.bucket";
    +    final static String S3_OBJECT_KEY = "s3.key";
    +    final static String S3_UPLOAD_ID_ATTR_KEY = "s3.uploadId";
    +    final static String S3_VERSION_ATTR_KEY = "s3.version";
    +    final static String S3_ETAG_ATTR_KEY = "s3.etag";
    +    final static String S3_EXPIRATION_ATTR_KEY = "s3.expiration";
    +    final static String S3_USERMETA_ATTR_KEY = "s3.usermetadata";
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName)
{
    +        return new PropertyDescriptor.Builder()
    +                .name(propertyDescriptorName)
    +                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +                .expressionLanguageSupported(true)
    +                .dynamic(true)
    +                .build();
    +    }
    +
    +    protected File getPersistenceFile() {
    +        return new File(PERSISTENCE_ROOT + getIdentifier());
    +    }
    +
    +    @Override
    +    public void onPropertyModified(final PropertyDescriptor descriptor, final String
oldValue, final String newValue) {
    +        if ( descriptor.equals(KEY)
    +                || descriptor.equals(BUCKET)
    +                || descriptor.equals(ENDPOINT_OVERRIDE)
    +                || descriptor.equals(STORAGE_CLASS)
    +                || descriptor.equals(REGION)) {
    +            destroyState();
    +        }
    +    }
    +
    +    protected MultipartState getState(final String s3ObjectKey) throws IOException {
    +        // get local state if it exists
    +        MultipartState currState = null;
    +        final File persistenceFile = getPersistenceFile();
    +        if (persistenceFile.exists()) {
    +            try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
    +                final Properties props = new Properties();
    +                props.load(fis);
    +                if (props.containsKey(s3ObjectKey)) {
    +                    final String localSerialState = props.getProperty(s3ObjectKey);
    +                    if (localSerialState != null) {
    +                        currState = new MultipartState(localSerialState);
    +                        getLogger().info("Local state for {} loaded with uploadId {}
and {} partETags",
    +                                new Object[]{s3ObjectKey, currState.uploadId, currState.partETags.size()});
    +                    }
    +                }
    +            } catch (IOException ioe) {
    +                getLogger().warn("Failed to recover local state for {} due to {}. Assuming
no local state and " +
    +                                "restarting upload.", new Object[]{s3ObjectKey, ioe.getMessage()});
    +            }
    +        }
    +        return currState;
    +    }
    +
    +    protected void persistState(final String s3ObjectKey, final MultipartState currState)
throws IOException {
    +        final String currStateStr = (currState == null) ? null : currState.toString();
    +        final File persistenceFile = getPersistenceFile();
    +        final File parentDir = persistenceFile.getParentFile();
    +        if (!parentDir.exists() && !parentDir.mkdirs()) {
    +            throw new IOException("Could not create persistence directory " + parentDir.getAbsolutePath()
+
    --- End diff --
    
    Agreed.


> Create new PutS3ObjectMultipart processor
> -----------------------------------------
>
>                 Key: NIFI-1107
>                 URL: https://issues.apache.org/jira/browse/NIFI-1107
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Core Framework
>            Reporter: Joe Skora
>            Assignee: Joe Skora
>              Labels: s3
>             Fix For: 0.5.0
>
>
> A new `PutS3ObjectMultipart` processor using the AWS S3 API to upload files larger than
those supported by `PutS3Object` which has a [5GB limit|http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html]
limit.
> To support S3 compatible endpoints this will also add an `Endpoint Override URL` property
to `AbstractAWSProcessor` to set the service [endpoint|http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/AmazonWebServiceClient.html#setEndpoint(java.lang.String)]
to override the endpoint URL normally selected based on the the Amazon region.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message