Return-Path: X-Original-To: apmail-nifi-commits-archive@minotaur.apache.org Delivered-To: apmail-nifi-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3358318BFB for ; Thu, 30 Apr 2015 23:41:48 +0000 (UTC) Received: (qmail 38707 invoked by uid 500); 30 Apr 2015 23:41:48 -0000 Delivered-To: apmail-nifi-commits-archive@nifi.apache.org Received: (qmail 38602 invoked by uid 500); 30 Apr 2015 23:41:48 -0000 Mailing-List: contact commits-help@nifi.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nifi.incubator.apache.org Delivered-To: mailing list commits@nifi.incubator.apache.org Received: (qmail 38581 invoked by uid 99); 30 Apr 2015 23:41:48 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 30 Apr 2015 23:41:48 +0000 X-ASF-Spam-Status: No, hits=-0.0 required=5.0 tests=SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: local policy) Received: from [54.191.145.13] (HELO mx1-us-west.apache.org) (54.191.145.13) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 30 Apr 2015 23:41:40 +0000 Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with SMTP id 56BE1256BF for ; Thu, 30 Apr 2015 23:41:20 +0000 (UTC) Received: (qmail 38356 invoked by uid 99); 30 Apr 2015 23:41:20 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 30 Apr 2015 23:41:20 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0F0EDE3A61; Thu, 30 Apr 2015 23:41:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: markap14@apache.org To: commits@nifi.incubator.apache.org Date: Thu, 30 Apr 2015 23:41:23 -0000 Message-Id: In-Reply-To: <2386c23f9d7a45d5b3a4597fe49c8882@git.apache.org> References: <2386c23f9d7a45d5b3a4597fe49c8882@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [5/7] incubator-nifi git commit: NIFI-559: Initial implementation of DuplicateFlowFile X-Virus-Checked: Checked by ClamAV on apache.org NIFI-559: Initial implementation of DuplicateFlowFile Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/21c5c48c Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/21c5c48c Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/21c5c48c Branch: refs/heads/develop Commit: 21c5c48cabe8631a173fb2513f7e224bac7a59ab Parents: e7954cf Author: Mark Payne Authored: Thu Apr 30 13:13:22 2015 -0400 Committer: Mark Payne Committed: Thu Apr 30 13:13:22 2015 -0400 ---------------------------------------------------------------------- .../processors/standard/DuplicateFlowFile.java | 81 ++++++++++++++++++++ .../org.apache.nifi.processor.Processor | 1 + .../standard/TestDuplicateFlowFile.java | 36 +++++++++ 3 files changed, 118 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/21c5c48c/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DuplicateFlowFile.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DuplicateFlowFile.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DuplicateFlowFile.java new file mode 100644 index 0000000..7400821 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DuplicateFlowFile.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.util.Collections; +import java.util.List; +import java.util.Set; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +@EventDriven +@SupportsBatching +@Tags({"test", "load", "duplicate"}) +@CapabilityDescription("Intended for load testing, this processor will create the configured number of copies of each incoming FlowFile") +public class DuplicateFlowFile extends AbstractProcessor { + + static final PropertyDescriptor NUM_COPIES = new PropertyDescriptor.Builder() + .name("Number of Copies") + .description("Specifies how many copies of each incoming FlowFile will be made") + .required(true) + .expressionLanguageSupported(false) + .defaultValue("100") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("The original FlowFile and all copies will be sent to this relationship") + .build(); + + @Override + public Set getRelationships() { + return Collections.singleton(REL_SUCCESS); + } + + @Override + protected List getSupportedPropertyDescriptors() { + return Collections.singletonList(NUM_COPIES); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + for (int i=0; i < context.getProperty(NUM_COPIES).asInteger(); i++) { + final FlowFile copy = session.clone(flowFile); + session.transfer(copy, REL_SUCCESS); + } + + session.transfer(flowFile, REL_SUCCESS); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/21c5c48c/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 7fbd781..17339bc 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -18,6 +18,7 @@ org.apache.nifi.processors.standard.ControlRate org.apache.nifi.processors.standard.ConvertCharacterSet org.apache.nifi.processors.standard.DetectDuplicate org.apache.nifi.processors.standard.DistributeLoad +org.apache.nifi.processors.standard.DuplicateFlowFile org.apache.nifi.processors.standard.EncryptContent org.apache.nifi.processors.standard.EvaluateJsonPath org.apache.nifi.processors.standard.EvaluateRegularExpression http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/21c5c48c/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDuplicateFlowFile.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDuplicateFlowFile.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDuplicateFlowFile.java new file mode 100644 index 0000000..82fee1b --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDuplicateFlowFile.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; + +public class TestDuplicateFlowFile { + + @Test + public void test() { + final TestRunner runner = TestRunners.newTestRunner(DuplicateFlowFile.class); + runner.setProperty(DuplicateFlowFile.NUM_COPIES, "100"); + + runner.enqueue("hello".getBytes()); + runner.run(); + + runner.assertAllFlowFilesTransferred(DuplicateFlowFile.REL_SUCCESS, 101); + } + +}