Repository: nifi
Updated Branches:
refs/heads/master f772f2f09 -> 59d3c6419
NIFI-4636 Updated GetMongo to support expression language on limit, results per flowfile and
batch fields.
NIFI-4636: Added unit tests
Signed-off-by: Matthew Burgess <mattyb149@apache.org>
This closes #2289
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/59d3c641
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/59d3c641
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/59d3c641
Branch: refs/heads/master
Commit: 59d3c64195d59b659e7fa70026a8d8c35bf78c8b
Parents: f772f2f
Author: Mike Thomsen <mikerthomsen@gmail.com>
Authored: Thu Nov 23 16:52:41 2017 -0500
Committer: Matthew Burgess <mattyb149@apache.org>
Committed: Mon Dec 11 12:49:42 2017 -0500
----------------------------------------------------------------------
.../apache/nifi/processors/mongodb/GetMongo.java | 9 ++++++---
.../nifi/processors/mongodb/GetMongoTest.java | 16 +++++++++++++++-
2 files changed, 21 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/59d3c641/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
index 7c78358..8988245 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
@@ -107,12 +107,14 @@ public class GetMongo extends AbstractMongoProcessor {
.name("Limit")
.description("The maximum number of elements to return")
.required(false)
+ .expressionLanguageSupported(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size")
.description("The number of elements returned from the server in one batch")
.required(false)
+ .expressionLanguageSupported(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder()
@@ -120,6 +122,7 @@ public class GetMongo extends AbstractMongoProcessor {
.displayName("Results Per FlowFile")
.description("How many results to put into a flowfile at once. The whole body will
be treated as a JSON array of results.")
.required(false)
+ .expressionLanguageSupported(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
@@ -241,10 +244,10 @@ public class GetMongo extends AbstractMongoProcessor {
it.sort(sort);
}
if (context.getProperty(LIMIT).isSet()) {
- it.limit(context.getProperty(LIMIT).asInteger());
+ it.limit(context.getProperty(LIMIT).evaluateAttributeExpressions().asInteger());
}
if (context.getProperty(BATCH_SIZE).isSet()) {
- it.batchSize(context.getProperty(BATCH_SIZE).asInteger());
+ it.batchSize(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger());
}
final MongoCursor<Document> cursor = it.iterator();
@@ -252,7 +255,7 @@ public class GetMongo extends AbstractMongoProcessor {
try {
FlowFile flowFile = null;
if (context.getProperty(RESULTS_PER_FLOWFILE).isSet()) {
- int ceiling = context.getProperty(RESULTS_PER_FLOWFILE).asInteger();
+ int ceiling = context.getProperty(RESULTS_PER_FLOWFILE).evaluateAttributeExpressions().asInteger();
List<Document> batch = new ArrayList<>();
while (cursor.hasNext()) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/59d3c641/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java
index feef93f..2de0c97 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java
@@ -228,7 +228,8 @@ public class GetMongoTest {
@Test
public void testLimit() throws Exception {
runner.setProperty(GetMongo.QUERY, "{a: {$exists: true}}");
- runner.setProperty(GetMongo.LIMIT, "1");
+ runner.setProperty(GetMongo.LIMIT, "${limit}");
+ runner.setVariable("limit", "1");
runner.run();
runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 1);
@@ -238,7 +239,20 @@ public class GetMongoTest {
@Test
public void testResultsPerFlowfile() throws Exception {
+ runner.setProperty(GetMongo.RESULTS_PER_FLOWFILE, "${results.per.flowfile}");
+ runner.setVariable("results.per.flowfile", "2");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 2);
+ List<MockFlowFile> results = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
+ Assert.assertTrue("Flowfile was empty", results.get(0).getSize() > 0);
+ Assert.assertEquals("Wrong mime type", results.get(0).getAttribute(CoreAttributes.MIME_TYPE.key()),
"application/json");
+ }
+
+ @Test
+ public void testBatchSize() throws Exception {
runner.setProperty(GetMongo.RESULTS_PER_FLOWFILE, "2");
+ runner.setProperty(GetMongo.BATCH_SIZE, "${batch.size}");
+ runner.setVariable("batch.size", "1");
runner.run();
runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 2);
List<MockFlowFile> results = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
|