Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BFF16200B2F for ; Sun, 19 Jun 2016 07:19:07 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id BEBF6160A50; Sun, 19 Jun 2016 05:19:07 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E2CF5160A63 for ; Sun, 19 Jun 2016 07:19:06 +0200 (CEST) Received: (qmail 55270 invoked by uid 500); 19 Jun 2016 05:19:05 -0000 Mailing-List: contact commits-help@nifi.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nifi.apache.org Delivered-To: mailing list commits@nifi.apache.org Received: (qmail 55257 invoked by uid 99); 19 Jun 2016 05:19:05 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 19 Jun 2016 05:19:05 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id 9D6442C1F5D for ; Sun, 19 Jun 2016 05:19:05 +0000 (UTC) Date: Sun, 19 Jun 2016 05:19:05 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: commits@nifi.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (NIFI-1022) Create GetTachyon and PutTachyon Processors MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Sun, 19 Jun 2016 05:19:07 -0000 [ https://issues.apache.org/jira/browse/NIFI-1022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15338378#comment-15338378 ] ASF GitHub Bot commented on NIFI-1022: -------------------------------------- Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi/pull/379#discussion_r67610439 --- Diff: nifi-nar-bundles/nifi-alluxio-bundle/nifi-alluxio-processors/src/main/java/org/apache/nifi/processors/alluxio/GetAlluxio.java --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.alluxio; + +import alluxio.AlluxioURI; +import alluxio.client.ReadType; +import alluxio.client.file.FileInStream; +import alluxio.client.file.URIStatus; +import alluxio.client.file.options.OpenFileOptions; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.util.StopWatch; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@Tags({"alluxio", "tachyon", "get", "file"}) +@EventDriven +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@CapabilityDescription("This processor will access the file using the input URI provided and write the content of " + + "the remote file to the content of the incoming FlowFile.") +public class GetAlluxio extends AbstractAlluxioProcessor { + + public static final PropertyDescriptor READ_TYPE = new PropertyDescriptor.Builder() + .name("alluxio-read-type") + .displayName("Read type") + .description("The Read Type to use when accessing the remote file") + .defaultValue(ReadType.CACHE_PROMOTE.toString()) + .required(true) + .allowableValues(ReadType.values()) + .build(); + + private final static List propertyDescriptors; + + // Relationships + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All files successfully retrieved are routed to this relationship") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("In case of failure, flow files will be routed to this relationship") + .autoTerminateDefault(true) + .build(); + public static final Relationship REL_SUCCESS_REQ = new Relationship.Builder() + .name("original") + .description("In case of success, the original FlowFile will be routed to this relationship") + .autoTerminateDefault(true) + .build(); + + private final static Set relationships; + + + /* + * Will ensure that the list of property descriptors is build only once. + * Will also create a Set of relationships + */ + static { + List _propertyDescriptors = new ArrayList<>(); + _propertyDescriptors.addAll(descriptors); + _propertyDescriptors.add(READ_TYPE); + propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); + + Set _relationships = new HashSet<>(); + _relationships.add(REL_SUCCESS); + _relationships.add(REL_FAILURE); + _relationships.add(REL_SUCCESS_REQ); + relationships = Collections.unmodifiableSet(_relationships); + } + + @Override + protected List getSupportedPropertyDescriptors() { + return propertyDescriptors; + } + + @Override + public Set getRelationships() { + return relationships; + } + + + @OnScheduled + public void onScheduled(final ProcessContext context) { + createFileSystem(context); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile request = null; + if (context.hasIncomingConnection()) { + request = session.get(); + + // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. + // However, if we have no FlowFile and we have connections coming from other Processors, then + // we know that we should run only if we have a FlowFile. + if (request == null && context.hasNonLoopConnection()) { + return; + } + } + + final StopWatch stopWatch = new StopWatch(true); + final String uri = context.getProperty(URI).evaluateAttributeExpressions(request).getValue(); + final AlluxioURI path = new AlluxioURI(uri); + final OpenFileOptions options = OpenFileOptions.defaults().setReadType(ReadType.valueOf(context.getProperty(READ_TYPE).getValue())); + + FileInStream in = null; + FlowFile flowFile = null; + + if(request == null) { + flowFile = session.create(request); + } else { + flowFile = session.create(); + } + + try { + final URIStatus status = fileSystem.get().getStatus(path); + flowFile = updateFlowFile(status, flowFile, session); + + in = fileSystem.get().openFile(path, options); + final FileInStream toCopy = in; + + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + IOUtils.copy(toCopy, out); --- End diff -- This is throwing errors in some cases and not bubbling out to the wrapping catch so there is no indication of issues. One particular instance was: > 016-06-19 01:12:52,512 ERROR [Timer-Driven Process Thread-8] o.a.nifi.processors.alluxio.PutAlluxio PutAlluxio[id=37744f8c-57fe-48c9-ac0b-e3fd255efc4b] An error occurred while writing file /nifi-630c823e-7492-4539-bf14-632dfa4b85e2; transferring to 'failure': java.lang.RuntimeException: No available Alluxio worker found 2016-06-19 01:12:52,513 ERROR [Timer-Driven Process Thread-8] o.a.nifi.processors.alluxio.PutAlluxio java.lang.RuntimeException: No available Alluxio worker found at alluxio.client.block.AlluxioBlockStore.getOutStream(AlluxioBlockStore.java:174) ~[alluxio-core-client-1.0.1.jar:na] at alluxio.client.file.FileOutStream.getNextBlock(FileOutStream.java:266) ~[alluxio-core-client-1.0.1.jar:na] at alluxio.client.file.FileOutStream.write(FileOutStream.java:231) ~[alluxio-core-client-1.0.1.jar:na] at org.apache.commons.io.IOUtils.copyLarge(IOUtils.java:1793) ~[commons-io-2.4.jar:2.4] at org.apache.commons.io.IOUtils.copyLarge(IOUtils.java:1769) ~[commons-io-2.4.jar:2.4] at org.apache.commons.io.IOUtils.copy(IOUtils.java:1744) ~[commons-io-2.4.jar:2.4] > Create GetTachyon and PutTachyon Processors > ------------------------------------------- > > Key: NIFI-1022 > URL: https://issues.apache.org/jira/browse/NIFI-1022 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions > Reporter: Jeremy Dyer > Assignee: Pierre Villard > Priority: Minor > Attachments: Alluxio.xml > > > Provide support for Apache Tachyon by implementing a GetTachyon and PutTachyon processor. Having the ability to both read and write to Tachyon would assist in sharing data with external applications such as Spark. -- This message was sent by Atlassian JIRA (v6.3.4#6332)