nifi-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jvwing <...@git.apache.org>
Subject [GitHub] nifi pull request #1482: NIFI-3449: Adding GCP Framework and GCS Processors
Date Fri, 10 Feb 2017 06:05:13 GMT
Github user jvwing commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1482#discussion_r100471948
  
    --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/PutGCSObject.java
---
    @@ -0,0 +1,523 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.gcp.storage;
    +
    +import com.google.cloud.storage.Acl;
    +import com.google.cloud.storage.Blob;
    +import com.google.cloud.storage.BlobId;
    +import com.google.cloud.storage.BlobInfo;
    +import com.google.cloud.storage.Storage;
    +import com.google.cloud.storage.StorageException;
    +import com.google.common.collect.ImmutableList;
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.BufferedInputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +import static com.google.cloud.storage.Storage.PredefinedAcl.ALL_AUTHENTICATED_USERS;
    +import static com.google.cloud.storage.Storage.PredefinedAcl.AUTHENTICATED_READ;
    +import static com.google.cloud.storage.Storage.PredefinedAcl.BUCKET_OWNER_FULL_CONTROL;
    +import static com.google.cloud.storage.Storage.PredefinedAcl.BUCKET_OWNER_READ;
    +import static com.google.cloud.storage.Storage.PredefinedAcl.PRIVATE;
    +import static com.google.cloud.storage.Storage.PredefinedAcl.PROJECT_PRIVATE;
    +import static com.google.cloud.storage.Storage.PredefinedAcl.PUBLIC_READ;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.BUCKET_ATTR;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.BUCKET_DESC;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CACHE_CONTROL_ATTR;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CACHE_CONTROL_DESC;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.COMPONENT_COUNT_ATTR;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.COMPONENT_COUNT_DESC;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CONTENT_DISPOSITION_ATTR;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CONTENT_DISPOSITION_DESC;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CONTENT_ENCODING_ATTR;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CONTENT_ENCODING_DESC;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CONTENT_LANGUAGE_ATTR;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CONTENT_LANGUAGE_DESC;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CONTENT_TYPE_ATTR;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CONTENT_TYPE_DESC;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CRC32C_ATTR;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CRC32C_DESC;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CREATE_TIME_ATTR;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CREATE_TIME_DESC;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.ENCRYPTION_ALGORITHM_ATTR;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.ENCRYPTION_ALGORITHM_DESC;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.ENCRYPTION_SHA256_ATTR;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.ENCRYPTION_SHA256_DESC;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.ETAG_ATTR;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.ETAG_DESC;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.GENERATED_ID_ATTR;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.GENERATED_ID_DESC;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.GENERATION_ATTR;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.GENERATION_DESC;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.KEY_ATTR;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.KEY_DESC;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.MD5_ATTR;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.MD5_DESC;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.MEDIA_LINK_ATTR;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.MEDIA_LINK_DESC;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.METAGENERATION_ATTR;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.METAGENERATION_DESC;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.OWNER_ATTR;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.OWNER_DESC;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.OWNER_TYPE_ATTR;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.OWNER_TYPE_DESC;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.SIZE_ATTR;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.SIZE_DESC;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.UPDATE_TIME_ATTR;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.UPDATE_TIME_DESC;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.URI_ATTR;
    +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.URI_DESC;
    +
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@Tags({"google", "google cloud", "gcs", "archive", "put"})
    +@CapabilityDescription("Puts flow files to a Google Cloud Bucket.")
    +@SeeAlso({FetchGCSObject.class, DeleteGCSObject.class, ListGCSBucket.class})
    +@DynamicProperty(name = "The name of a User-Defined Metadata field to add to the GCS
Object",
    +        value = "The value of a User-Defined Metadata field to add to the GCS Object",
    +        description = "Allows user-defined metadata to be added to the GCS object as
key/value pairs",
    +        supportsExpressionLanguage = true)
    +@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's filename as
the filename for the GCS object")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = BUCKET_ATTR, description = BUCKET_DESC),
    +        @WritesAttribute(attribute = KEY_ATTR, description = KEY_DESC),
    +        @WritesAttribute(attribute = SIZE_ATTR, description = SIZE_DESC),
    +        @WritesAttribute(attribute = CACHE_CONTROL_ATTR, description = CACHE_CONTROL_DESC),
    +        @WritesAttribute(attribute = COMPONENT_COUNT_ATTR, description = COMPONENT_COUNT_DESC),
    +        @WritesAttribute(attribute = CONTENT_DISPOSITION_ATTR, description = CONTENT_DISPOSITION_DESC),
    +        @WritesAttribute(attribute = CONTENT_ENCODING_ATTR, description = CONTENT_ENCODING_DESC),
    +        @WritesAttribute(attribute = CONTENT_LANGUAGE_ATTR, description = CONTENT_LANGUAGE_DESC),
    +        @WritesAttribute(attribute = CONTENT_TYPE_ATTR, description = CONTENT_TYPE_DESC),
    +        @WritesAttribute(attribute = CRC32C_ATTR, description = CRC32C_DESC),
    +        @WritesAttribute(attribute = CREATE_TIME_ATTR, description = CREATE_TIME_DESC),
    +        @WritesAttribute(attribute = UPDATE_TIME_ATTR, description = UPDATE_TIME_DESC),
    +        @WritesAttribute(attribute = ENCRYPTION_ALGORITHM_ATTR, description = ENCRYPTION_ALGORITHM_DESC),
    +        @WritesAttribute(attribute = ENCRYPTION_SHA256_ATTR, description = ENCRYPTION_SHA256_DESC),
    +        @WritesAttribute(attribute = ETAG_ATTR, description = ETAG_DESC),
    +        @WritesAttribute(attribute = GENERATED_ID_ATTR, description = GENERATED_ID_DESC),
    +        @WritesAttribute(attribute = GENERATION_ATTR, description = GENERATION_DESC),
    +        @WritesAttribute(attribute = MD5_ATTR, description = MD5_DESC),
    +        @WritesAttribute(attribute = MEDIA_LINK_ATTR, description = MEDIA_LINK_DESC),
    +        @WritesAttribute(attribute = METAGENERATION_ATTR, description = METAGENERATION_DESC),
    +        @WritesAttribute(attribute = OWNER_ATTR, description = OWNER_DESC),
    +        @WritesAttribute(attribute = OWNER_TYPE_ATTR, description = OWNER_TYPE_DESC),
    +        @WritesAttribute(attribute = URI_ATTR, description = URI_DESC)
    +})
    +public class PutGCSObject extends AbstractGCSProcessor {
    +    public static final PropertyDescriptor KEY = new PropertyDescriptor
    +            .Builder().name("gcs-key")
    +            .displayName("Key")
    +            .description(KEY_DESC)
    +            .required(true)
    +            .defaultValue("${filename}")
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor
    +            .Builder().name("gcs-content-type")
    +                      .displayName("Content Type")
    +                      .description("Content Type for the file, i.e. text/plain")
    +                      .required(false)
    +                      .expressionLanguageSupported(true)
    +                      .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +                      .build();
    +
    +    public static final PropertyDescriptor MD5 = new PropertyDescriptor
    +            .Builder().name("gcs-object-md5")
    +            .displayName("MD5 Hash")
    +            .description("MD5 Hash (encoded in Base64) of the file for server-side validation.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +
    +    public static final PropertyDescriptor CRC32C = new PropertyDescriptor
    +            .Builder().name("gcs-object-crc32c")
    +            .displayName("CRC32C Checksum")
    +            .description("CRC32C Checksum (encoded in Base64, big-Endian order) of the
file for server-side validation.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final AllowableValue ACL_ALL_AUTHENTICATED_USERS = new AllowableValue(
    +            ALL_AUTHENTICATED_USERS.name(), "All Authenticated Users", "Gives the bucket
or object owner OWNER " +
    +            "permission, and gives all authenticated Google account holders READER and
WRITER permissions. " +
    +            "All other permissions are removed."
    +    );
    +
    +    public static final AllowableValue ACL_AUTHENTICATED_READ = new AllowableValue(
    +            AUTHENTICATED_READ.name(), "Authenticated Read", "Gives the bucket or object
owner OWNER permission, " +
    +            "and gives all authenticated Google account holders READER permission. All
other permissions are removed."
    +    );
    +
    +    public static final AllowableValue ACL_BUCKET_OWNER_FULL_CONTROL = new AllowableValue(
    +            BUCKET_OWNER_FULL_CONTROL.name(), "Bucket Owner Full Control", "Gives the
object and bucket owners OWNER " +
    +            "permission. All other permissions are removed."
    +    );
    +
    +    public static final AllowableValue ACL_BUCKET_OWNER_READ = new AllowableValue(
    +            BUCKET_OWNER_READ.name(), "Bucket Owner Read Only", "Gives the object owner
OWNER permission, and gives " +
    +            "the bucket owner READER permission. All other permissions are removed."
    +    );
    +
    +    public static final AllowableValue ACL_PRIVATE = new AllowableValue(
    +            PRIVATE.name(), "Private", "Gives the bucket or object owner OWNER permission
for a bucket or object, " +
    +            "and removes all other access permissions."
    +    );
    +
    +    public static final AllowableValue ACL_PROJECT_PRIVATE = new AllowableValue(
    +            PROJECT_PRIVATE.name(), "Project Private", "Gives permission to the project
team based on their roles. " +
    +            "Anyone who is part of the team has READER permission. Project owners and
project editors have OWNER " +
    +            "permission. This is the default ACL for newly created buckets. This is also
the default ACL for newly " +
    +            "created objects unless the default object ACL for that bucket has been changed."
    +    );
    +
    +    public static final AllowableValue ACL_PUBLIC_READ = new AllowableValue(
    +            PUBLIC_READ.name(), "Public Read Only", "Gives the bucket or object owner
OWNER permission, and gives all " +
    +            "users, both authenticated and anonymous, READER permission. When you apply
this to an object, anyone on " +
    +            "the Internet can read the object without authenticating."
    +    );
    +
    +    public static final PropertyDescriptor ACL = new PropertyDescriptor.Builder()
    +            .name("gcs-object-acl")
    +            .displayName("Object ACL")
    +            .description("Access Control to be attached to the object uploaded. Not providing
this will revert to bucket defaults.")
    +            .required(false)
    +            .allowableValues(
    +                    ACL_ALL_AUTHENTICATED_USERS,
    +                    ACL_AUTHENTICATED_READ,
    +                    ACL_BUCKET_OWNER_FULL_CONTROL,
    +                    ACL_BUCKET_OWNER_READ,
    +                    ACL_PRIVATE,
    +                    ACL_PROJECT_PRIVATE,
    +                    ACL_PUBLIC_READ)
    +            .build();
    +
    +    public static final PropertyDescriptor ENCRYPTION_KEY = new PropertyDescriptor.Builder()
    +            .name("gcs-server-side-encryption-key")
    +            .displayName("Server Side Encryption Key")
    +            .description("An AES256 Encryption Key (encoded in base64) for server-side
encryption of the object.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .sensitive(true)
    +            .build();
    +
    +
    +    public static final PropertyDescriptor OVERWRITE = new PropertyDescriptor.Builder()
    +            .name("gcs-overwrite-object")
    +            .displayName("Overwrite Object")
    +            .description("If false, the upload to GCS will succeed only if the object
does not exist.")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final AllowableValue CD_INLINE = new AllowableValue(
    +            "inline", "Inline", "Indicates that the object should be loaded and rendered
within the browser."
    +    );
    +
    +    public static final AllowableValue CD_ATTACHMENT = new AllowableValue(
    +            "attachment", "Attachment", "Indicates that the object should be saved (using
a Save As... dialog) rather " +
    +            "than opened directly within the browser"
    +    );
    +
    +    public static final PropertyDescriptor CONTENT_DISPOSITION_TYPE = new PropertyDescriptor.Builder()
    +            .name("gcs-content-disposition-type")
    +            .displayName("Content Disposition Type")
    +            .description("Type of RFC-6266 Content Disposition to be attached to the
object")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .allowableValues(CD_INLINE, CD_ATTACHMENT)
    +            .build();
    +
    +    @Override
    +    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return ImmutableList.<PropertyDescriptor>builder()
    +                .addAll(super.getSupportedPropertyDescriptors())
    +                .add(KEY)
    +                .add(CONTENT_TYPE)
    +                .add(MD5)
    +                .add(CRC32C)
    +                .add(ACL)
    +                .add(ENCRYPTION_KEY)
    +                .add(OVERWRITE)
    +                .add(CONTENT_DISPOSITION_TYPE)
    +                .build();
    +    }
    +
    +    @Override
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName)
{
    +        return new PropertyDescriptor.Builder()
    +                .name(propertyDescriptorName)
    +                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +                .expressionLanguageSupported(true)
    +                .dynamic(true)
    +                .build();
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session)
throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final long startNanos = System.nanoTime();
    +
    +        final String bucket = context.getProperty(BUCKET)
    +                .evaluateAttributeExpressions(flowFile)
    +                .getValue();
    +        final String key = context.getProperty(KEY)
    +                .evaluateAttributeExpressions(flowFile)
    +                .getValue();
    +        final boolean overwrite = context.getProperty(OVERWRITE).asBoolean();
    +
    +        final FlowFile ff = flowFile;
    +        final String ffFilename = ff.getAttributes().get(CoreAttributes.FILENAME.key());
    +        final Map<String, String> attributes = new HashMap<>();
    +
    +        try {
    +            final Storage storage = getCloudService();
    +            session.read(flowFile, new InputStreamCallback() {
    +                @Override
    +                public void process(InputStream rawIn) throws IOException {
    +                    try (final InputStream in = new BufferedInputStream(rawIn)) {
    +                        final BlobId id = BlobId.of(bucket, key);
    +                        final BlobInfo.Builder blobInfoBuilder = BlobInfo.newBuilder(id);
    +                        final List<Storage.BlobWriteOption> blobWriteOptions =
new ArrayList<>();
    +
    +                        if (!overwrite) {
    +                            blobWriteOptions.add(Storage.BlobWriteOption.doesNotExist());
    +                        }
    +
    +                        final String contentDispositionType = context.getProperty(CONTENT_DISPOSITION_TYPE).getValue();
    +                        if (contentDispositionType != null) {
    +                            blobInfoBuilder.setContentDisposition(contentDispositionType
+ "; filename=" + ffFilename);
    +                        }
    +
    +                        final String contentType = context.getProperty(CONTENT_TYPE)
    +                                .evaluateAttributeExpressions(ff).getValue();
    +                        if (contentType != null) {
    +                            blobInfoBuilder.setContentType(contentType);
    +                        }
    +
    +                        final String md5 = context.getProperty(MD5)
    +                                .evaluateAttributeExpressions(ff).getValue();
    +                        if (md5 != null) {
    +                            blobInfoBuilder.setMd5(md5);
    +                            blobWriteOptions.add(Storage.BlobWriteOption.md5Match());
    +                        }
    +
    +                        final String crc32c = context.getProperty(CRC32C)
    +                                .evaluateAttributeExpressions(ff).getValue();
    +                        if (crc32c != null) {
    +                            blobInfoBuilder.setCrc32c(crc32c);
    +                            blobWriteOptions.add(Storage.BlobWriteOption.crc32cMatch());
    +                        }
    +
    +                        final String acl = context.getProperty(ACL).getValue();
    +                        if (acl != null) {
    +                            blobWriteOptions.add(Storage.BlobWriteOption.predefinedAcl(
    +                                    Storage.PredefinedAcl.valueOf(acl)
    +                            ));
    +                        }
    +
    +                        final String encryptionKey = context.getProperty(ENCRYPTION_KEY)
    +                                .evaluateAttributeExpressions(ff).getValue();
    +                        if (encryptionKey != null) {
    +                            blobWriteOptions.add(Storage.BlobWriteOption.encryptionKey(encryptionKey));
    +                        }
    +
    +                        final HashMap<String, String> userMetadata = new HashMap<>();
    +                        for (final Map.Entry<PropertyDescriptor, String> entry
: context.getProperties().entrySet()) {
    +                            if (entry.getKey().isDynamic()) {
    +                                final String value = context.getProperty(
    +                                        entry.getKey()).evaluateAttributeExpressions(ff).getValue();
    +                                userMetadata.put(entry.getKey().getName(), value);
    +                            }
    +                        }
    +
    +                        if (!userMetadata.isEmpty()) {
    +                            blobInfoBuilder.setMetadata(userMetadata);
    +                        }
    +
    +                        try {
    +
    +                            //TODO: Take advantage of RestorableState to checkpoint large
blob uploads
    +                            final Blob blob = storage.create(blobInfoBuilder.build(),
    +                                    in,
    +                                    blobWriteOptions.toArray(new Storage.BlobWriteOption[blobWriteOptions.size()])
    +                            );
    +
    +                            // Create attributes
    +                            attributes.put(BUCKET_ATTR, blob.getBucket());
    +                            attributes.put(KEY_ATTR, blob.getName());
    +
    +
    +                            if (blob.getSize() != null) {
    +                                attributes.put(SIZE_ATTR, String.valueOf(blob.getSize()));
    +                            }
    +
    +                            if (blob.getCacheControl() != null) {
    +                                attributes.put(CACHE_CONTROL_ATTR, blob.getCacheControl());
    +                            }
    +
    +                            if (blob.getComponentCount() != null) {
    +                                attributes.put(COMPONENT_COUNT_ATTR, String.valueOf(blob.getComponentCount()));
    +                            }
    +
    +                            if (blob.getContentDisposition() != null) {
    +                                attributes.put(CONTENT_DISPOSITION_ATTR, blob.getContentDisposition());
    +                                final Util.ParsedContentDisposition parsed = Util.parseContentDisposition(blob.getContentDisposition());
    +
    +                                if (parsed != null) {
    +                                    attributes.put(CoreAttributes.FILENAME.key(), parsed.getFileName());
    +                                }
    +                            }
    +
    +                            if (blob.getContentEncoding() != null) {
    +                                attributes.put(CONTENT_ENCODING_ATTR, blob.getContentEncoding());
    +                            }
    +
    +                            if (blob.getContentLanguage() != null) {
    +                                attributes.put(CONTENT_LANGUAGE_ATTR, blob.getContentLanguage());
    +                            }
    +
    +                            if (blob.getContentType() != null) {
    +                                attributes.put(CONTENT_TYPE_ATTR, blob.getContentType());
    +                            }
    +
    +                            if (blob.getCrc32c() != null) {
    +                                attributes.put(CRC32C_ATTR, blob.getCrc32c());
    +                            }
    +
    +                            if (blob.getCustomerEncryption() != null) {
    +                                final BlobInfo.CustomerEncryption encryption = blob.getCustomerEncryption();
    +
    +                                attributes.put(ENCRYPTION_ALGORITHM_ATTR, encryption.getEncryptionAlgorithm());
    +                                attributes.put(ENCRYPTION_SHA256_ATTR, encryption.getKeySha256());
    +                            }
    +
    +                            if (blob.getEtag() != null) {
    +                                attributes.put(ETAG_ATTR, blob.getEtag());
    +                            }
    +
    +                            if (blob.getGeneratedId() != null) {
    +                                attributes.put(GENERATED_ID_ATTR, blob.getGeneratedId());
    +                            }
    +
    +                            if (blob.getGeneration() != null) {
    +                                attributes.put(GENERATION_ATTR, String.valueOf(blob.getGeneration()));
    +                            }
    +
    +                            if (blob.getMd5() != null) {
    +                                attributes.put(MD5_ATTR, blob.getMd5());
    +                            }
    +
    +                            if (blob.getMediaLink() != null) {
    +                                attributes.put(MEDIA_LINK_ATTR, blob.getMediaLink());
    +                            }
    +
    +                            if (blob.getMetageneration() != null) {
    +                                attributes.put(METAGENERATION_ATTR, String.valueOf(blob.getMetageneration()));
    +                            }
    +
    +                            if (blob.getOwner() != null) {
    +                                final Acl.Entity entity = blob.getOwner();
    +
    +                                if (entity instanceof Acl.User) {
    +                                    attributes.put(OWNER_ATTR, ((Acl.User) entity).getEmail());
    +                                    attributes.put(OWNER_TYPE_ATTR, "user");
    +                                } else if (entity instanceof Acl.Group) {
    +                                    attributes.put(OWNER_ATTR, ((Acl.Group) entity).getEmail());
    +                                    attributes.put(OWNER_TYPE_ATTR, "group");
    +                                } else if (entity instanceof Acl.Domain) {
    +                                    attributes.put(OWNER_ATTR, ((Acl.Domain) entity).getDomain());
    +                                    attributes.put(OWNER_TYPE_ATTR, "domain");
    +                                } else if (entity instanceof Acl.Project) {
    +                                    attributes.put(OWNER_ATTR, ((Acl.Project) entity).getProjectId());
    +                                    attributes.put(OWNER_TYPE_ATTR, "project");
    +                                }
    +                            }
    +
    +                            if (blob.getSelfLink() != null) {
    +                                attributes.put(URI_ATTR, blob.getSelfLink());
    +                            }
    +
    +                            if (blob.getCreateTime() != null) {
    +                                attributes.put(CREATE_TIME_ATTR, String.valueOf(blob.getCreateTime()));
    +                            }
    +
    +                            if (blob.getUpdateTime() != null) {
    +                                attributes.put(UPDATE_TIME_ATTR, String.valueOf(blob.getUpdateTime()));
    +                            }
    +                        } catch (StorageException e) {
    +                            getLogger().info("Failure completing upload flowfile={} bucket={}
key={} reason={}",
    --- End diff --
    
    Should this be `error(...)` instead of `info(...)`?  I see the exception gets logged as
an error below on 518, but not with the same level of helpful detail.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message