From commits-return-40639-archive-asf-public=cust-asf.ponee.io@nifi.apache.org Tue Mar 12 19:39:58 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 0509218078A for ; Tue, 12 Mar 2019 20:39:55 +0100 (CET) Received: (qmail 98221 invoked by uid 500); 12 Mar 2019 19:39:55 -0000 Mailing-List: contact commits-help@nifi.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nifi.apache.org Delivered-To: mailing list commits@nifi.apache.org Received: (qmail 98207 invoked by uid 99); 12 Mar 2019 19:39:55 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 12 Mar 2019 19:39:55 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 1C867879DF; Tue, 12 Mar 2019 19:39:54 +0000 (UTC) Date: Tue, 12 Mar 2019 19:39:57 +0000 To: "commits@nifi.apache.org" Subject: [nifi] 05/21: NIFI-5977 - Add "Minimum/Maximum File Age" Parameter to ListSFTP MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: joewitt@apache.org In-Reply-To: <155241959255.4048.11474010104598514683@gitbox.apache.org> References: <155241959255.4048.11474010104598514683@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: nifi X-Git-Refname: refs/heads/support/nifi-1.9.x X-Git-Reftype: branch X-Git-Rev: d99db837dac72b8290b491f41017e6ee112c3886 X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20190312193954.1C867879DF@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch support/nifi-1.9.x in repository https://gitbox.apache.org/repos/asf/nifi.git commit d99db837dac72b8290b491f41017e6ee112c3886 Author: Arpad Boda AuthorDate: Thu Feb 21 14:37:33 2019 +0100 NIFI-5977 - Add "Minimum/Maximum File Age" Parameter to ListSFTP Signed-off-by: Pierre Villard This closes #3324. --- .../nifi-standard-processors/pom.xml | 6 + .../apache/nifi/processors/standard/ListSFTP.java | 57 ++++++++++ .../nifi/processors/standard/TestListSFTP.java | 126 +++++++++++++++++++++ 3 files changed, 189 insertions(+) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index 4ac3bf7..517f782 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -348,6 +348,12 @@ 1.9.1-SNAPSHOT test + + com.github.stefanbirkner + fake-sftp-server-rule + 2.0.1 + test + diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java index 89012b0..ccdf1d8 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java @@ -17,9 +17,14 @@ package org.apache.nifi.processors.standard; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; +import java.util.stream.Collectors; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; @@ -31,13 +36,16 @@ import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.state.Scope; import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.util.list.ListedEntityTracker; +import org.apache.nifi.processors.standard.util.FileInfo; import org.apache.nifi.processors.standard.util.FTPTransfer; import org.apache.nifi.processors.standard.util.FileTransfer; import org.apache.nifi.processors.standard.util.SFTPTransfer; @@ -68,6 +76,8 @@ import org.apache.nifi.processors.standard.util.SFTPTransfer; + "a new Primary Node is selected, the new node will not duplicate the data that was listed by the previous Primary Node.") public class ListSFTP extends ListFileTransfer { + private final AtomicReference> fileFilterRef = new AtomicReference(); + @Override protected List getSupportedPropertyDescriptors() { final PropertyDescriptor port = new PropertyDescriptor.Builder().fromPropertyDescriptor(UNDEFAULTED_PORT).defaultValue("22").build(); @@ -102,6 +112,10 @@ public class ListSFTP extends ListFileTransfer { properties.add(ListedEntityTracker.TRACKING_STATE_CACHE); properties.add(ListedEntityTracker.TRACKING_TIME_WINDOW); properties.add(ListedEntityTracker.INITIAL_LISTING_TARGET); + properties.add(ListFile.MIN_AGE); + properties.add(ListFile.MAX_AGE); + properties.add(ListFile.MIN_SIZE); + properties.add(ListFile.MAX_SIZE); return properties; } @@ -126,4 +140,47 @@ public class ListSFTP extends ListFileTransfer { protected void customValidate(ValidationContext validationContext, Collection results) { SFTPTransfer.validateProxySpec(validationContext, results); } + + @Override + protected List performListing(final ProcessContext context, final Long minTimestamp) throws IOException { + final List listing = super.performListing(context, minTimestamp); + + return listing.stream() + .filter(fileFilterRef.get()) + .collect(Collectors.toList()); + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + fileFilterRef.set(createFileFilter(context)); + } + + private Predicate createFileFilter(final ProcessContext context) { + final long minSize = context.getProperty(ListFile.MIN_SIZE).asDataSize(DataUnit.B).longValue(); + final Double maxSize = context.getProperty(ListFile.MAX_SIZE).asDataSize(DataUnit.B); + final long minAge = context.getProperty(ListFile.MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS); + final Long maxAge = context.getProperty(ListFile.MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS); + + return (attributes) -> { + if(attributes.isDirectory()) { + return true; + } + + if (minSize > attributes.getSize()) { + return false; + } + if (maxSize != null && maxSize < attributes.getSize()) { + return false; + } + final long fileAge = System.currentTimeMillis() - attributes.getLastModifiedTime(); + if (minAge > fileAge) { + return false; + } + if (maxAge != null && maxAge < fileAge) { + return false; + } + + return true; + }; + } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java new file mode 100644 index 0000000..68a217d --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import com.github.stefanbirkner.fakesftpserver.rule.FakeSftpServerRule; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.processor.util.list.AbstractListProcessor; +import org.apache.nifi.processors.standard.util.FTPTransfer; +import org.apache.nifi.processors.standard.util.SFTPTransfer; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.Rule; +import java.security.SecureRandom; + +public class TestListSFTP { + @Rule + public final FakeSftpServerRule sftpServer = new FakeSftpServerRule(); + int port; + + final String username = "nifi-sftp-user"; + final String password = "Test test test chocolate"; + + @Before + public void setUp() throws Exception { + sftpServer.addUser(username, password); + port = sftpServer.getPort(); + + + sftpServer.putFile("/directory/smallfile.txt", "byte", StandardCharsets.UTF_8); + + sftpServer.putFile("/directory/file.txt", "a bit more content in this file", StandardCharsets.UTF_8); + + byte[] bytes = new byte[120]; + SecureRandom.getInstanceStrong().nextBytes(bytes); + + sftpServer.putFile("/directory/file.bin", bytes); + } + + @After + public void tearDown() throws Exception { + sftpServer.deleteAllFilesAndDirectories(); + } + + @Test + public void basicFileList() throws InterruptedException { + TestRunner runner = TestRunners.newTestRunner(ListSFTP.class); + runner.setProperty(ListSFTP.HOSTNAME, "localhost"); + runner.setProperty(ListSFTP.USERNAME, username); + runner.setProperty(SFTPTransfer.PASSWORD, password); + runner.setProperty(FTPTransfer.PORT, Integer.toString(port)); + runner.setProperty(ListSFTP.REMOTE_PATH, "/directory/"); + + runner.setProperty(ListFile.TARGET_SYSTEM_TIMESTAMP_PRECISION, ListFile.PRECISION_MILLIS); + runner.assertValid(); + + // Ensure wait for enough lag time. + Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS.get(TimeUnit.MILLISECONDS) * 2); + + runner.run(); + + runner.assertTransferCount(ListSFTP.REL_SUCCESS, 3); + + runner.assertAllFlowFilesContainAttribute("sftp.remote.host"); + runner.assertAllFlowFilesContainAttribute("sftp.remote.port"); + runner.assertAllFlowFilesContainAttribute("sftp.listing.user"); + runner.assertAllFlowFilesContainAttribute(ListFile.FILE_OWNER_ATTRIBUTE); + runner.assertAllFlowFilesContainAttribute(ListFile.FILE_GROUP_ATTRIBUTE); + runner.assertAllFlowFilesContainAttribute(ListFile.FILE_PERMISSIONS_ATTRIBUTE); + runner.assertAllFlowFilesContainAttribute(ListFile.FILE_SIZE_ATTRIBUTE); + runner.assertAllFlowFilesContainAttribute(ListFile.FILE_LAST_MODIFY_TIME_ATTRIBUTE); + runner.assertAllFlowFilesContainAttribute( "filename"); + + final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(ListSFTP.REL_SUCCESS).get(0); + retrievedFile.assertAttributeEquals("sftp.listing.user", username); + } + + + @Test + public void sizeFilteredFileList() throws InterruptedException { + TestRunner runner = TestRunners.newTestRunner(ListSFTP.class); + runner.setProperty(ListSFTP.HOSTNAME, "localhost"); + runner.setProperty(ListSFTP.USERNAME, username); + runner.setProperty(SFTPTransfer.PASSWORD, password); + runner.setProperty(FTPTransfer.PORT, Integer.toString(port)); + runner.setProperty(ListSFTP.REMOTE_PATH, "/directory/"); + runner.setProperty(ListFile.MIN_SIZE, "8B"); + runner.setProperty(ListFile.MAX_SIZE, "100B"); + + + runner.setProperty(ListFile.TARGET_SYSTEM_TIMESTAMP_PRECISION, ListFile.PRECISION_MILLIS); + runner.assertValid(); + + // Ensure wait for enough lag time. + Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS.get(TimeUnit.MILLISECONDS) * 2); + + runner.run(); + + runner.assertTransferCount(ListSFTP.REL_SUCCESS, 1); + + final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(ListSFTP.REL_SUCCESS).get(0); + //the only file between the limits + retrievedFile.assertAttributeEquals("filename", "file.txt"); + } +}