nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mattyb...@apache.org
Subject nifi git commit: NIFI-4257 - add custom WHERE clause in database fetch processors
Date Thu, 07 Sep 2017 18:47:43 GMT
Repository: nifi
Updated Branches:
  refs/heads/master ae30c7f35 -> c10ff574c


NIFI-4257 - add custom WHERE clause in database fetch processors

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #2050


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c10ff574
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c10ff574
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c10ff574

Branch: refs/heads/master
Commit: c10ff574c4602fe05f5d1dae5eb0b1bd24026c02
Parents: ae30c7f
Author: Pierre Villard <pierre.villard.fr@gmail.com>
Authored: Wed Aug 2 15:22:31 2017 +0200
Committer: Matthew Burgess <mattyb149@apache.org>
Committed: Thu Sep 7 14:41:26 2017 -0400

----------------------------------------------------------------------
 .../AbstractDatabaseFetchProcessor.java         |   9 ++
 .../processors/standard/GenerateTableFetch.java |   7 +
 .../processors/standard/QueryDatabaseTable.java |  21 ++-
 .../standard/QueryDatabaseTableTest.java        | 153 +++++++++++++++++--
 .../standard/TestGenerateTableFetch.java        | 115 ++++++++++++++
 5 files changed, 288 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/c10ff574/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
index ebe23d0..1f26976 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
@@ -146,6 +146,15 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
             .expressionLanguageSupported(true)
             .build();
 
+    public static final PropertyDescriptor WHERE_CLAUSE = new PropertyDescriptor.Builder()
+            .name("db-fetch-where-clause")
+            .displayName("Additional WHERE clause")
+            .description("A custom clause to be added in the WHERE condition when generating
SQL requests.")
+            .required(false)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
     protected List<PropertyDescriptor> propDescriptors;
 
     // The delimiter to use when referencing qualified names (such as table@!@column in the
state map)

http://git-wip-us.apache.org/repos/asf/nifi/blob/c10ff574/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
index 3db2782..12278a3 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
@@ -132,6 +132,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor
{
         pds.add(MAX_VALUE_COLUMN_NAMES);
         pds.add(QUERY_TIMEOUT);
         pds.add(PARTITION_SIZE);
+        pds.add(WHERE_CLAUSE);
         propDescriptors = Collections.unmodifiableList(pds);
     }
 
@@ -185,6 +186,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor
{
         final String columnNames = context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions(fileToProcess).getValue();
         final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions(fileToProcess).getValue();
         final int partitionSize = context.getProperty(PARTITION_SIZE).evaluateAttributeExpressions(fileToProcess).asInteger();
+        final String customWhereClause = context.getProperty(WHERE_CLAUSE).evaluateAttributeExpressions(fileToProcess).getValue();
 
         final StateManager stateManager = context.getStateManager();
         final StateMap stateMap;
@@ -248,6 +250,11 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor
{
                 }
             });
 
+            if(customWhereClause != null) {
+                // adding the custom WHERE clause (if defined) to the list of existing clauses.
+                maxValueClauses.add("(" + customWhereClause + ")");
+            }
+
             whereClause = StringUtils.join(maxValueClauses, " AND ");
             columnsClause = StringUtils.join(maxValueSelectColumns, ", ");
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c10ff574/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index 8ae157a..0532b79 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -161,6 +161,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor
{
         pds.add(USE_AVRO_LOGICAL_TYPES);
         pds.add(DEFAULT_PRECISION);
         pds.add(DEFAULT_SCALE);
+        pds.add(WHERE_CLAUSE);
         propDescriptors = Collections.unmodifiableList(pds);
     }
 
@@ -192,6 +193,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor
{
         final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
         final String columnNames = context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions().getValue();
         final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions().getValue();
+        final String customWhereClause = context.getProperty(WHERE_CLAUSE).evaluateAttributeExpressions().getValue();
         final Integer fetchSize = context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger();
         final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
         final Integer maxFragments = context.getProperty(MAX_FRAGMENTS).isSet()
@@ -243,7 +245,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor
{
         List<String> maxValueColumnNameList = StringUtils.isEmpty(maxValueColumnNames)
                 ? null
                 : Arrays.asList(maxValueColumnNames.split("\\s*,\\s*"));
-        final String selectQuery = getQuery(dbAdapter, tableName, columnNames, maxValueColumnNameList,
statePropertyMap);
+        final String selectQuery = getQuery(dbAdapter, tableName, columnNames, maxValueColumnNameList,
customWhereClause, statePropertyMap);
         final StopWatch stopWatch = new StopWatch(true);
         final String fragmentIdentifier = UUID.randomUUID().toString();
 
@@ -363,15 +365,15 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor
{
     }
 
     protected String getQuery(DatabaseAdapter dbAdapter, String tableName, String columnNames,
List<String> maxValColumnNames,
-                              Map<String, String> stateMap) {
+                              String customWhereClause, Map<String, String> stateMap)
{
         if (StringUtils.isEmpty(tableName)) {
             throw new IllegalArgumentException("Table name must be specified");
         }
         final StringBuilder query = new StringBuilder(dbAdapter.getSelectStatement(tableName,
columnNames, null, null, null, null));
 
+        List<String> whereClauses = new ArrayList<>();
         // Check state map for last max values
         if (stateMap != null && !stateMap.isEmpty() && maxValColumnNames
!= null) {
-            List<String> whereClauses = new ArrayList<>(maxValColumnNames.size());
             IntStream.range(0, maxValColumnNames.size()).forEach((index) -> {
                 String colName = maxValColumnNames.get(index);
                 String maxValueKey = getStateKey(tableName, colName);
@@ -392,10 +394,15 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor
{
                     whereClauses.add(colName + (index == 0 ? " > " : " >= ") + getLiteralByType(type,
maxValue, dbAdapter.getName()));
                 }
             });
-            if (!whereClauses.isEmpty()) {
-                query.append(" WHERE ");
-                query.append(StringUtils.join(whereClauses, " AND "));
-            }
+        }
+
+        if (customWhereClause != null) {
+            whereClauses.add("(" + customWhereClause + ")");
+        }
+
+        if (!whereClauses.isEmpty()) {
+            query.append(" WHERE ");
+            query.append(StringUtils.join(whereClauses, " AND "));
         }
 
         return query.toString();

http://git-wip-us.apache.org/repos/asf/nifi/blob/c10ff574/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
index b015371..8a2319b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
@@ -45,13 +45,13 @@ import org.junit.Test;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
-import java.text.SimpleDateFormat;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.sql.SQLNonTransientConnectionException;
 import java.sql.Statement;
 import java.sql.Types;
+import java.text.SimpleDateFormat;
 import java.util.Arrays;
 import java.util.Calendar;
 import java.util.Collections;
@@ -132,12 +132,12 @@ public class QueryDatabaseTableTest {
 
     @Test
     public void testGetQuery() throws Exception {
-        String query = processor.getQuery(dbAdapter, "myTable", null, null, null);
+        String query = processor.getQuery(dbAdapter, "myTable", null, null, null, null);
         assertEquals("SELECT * FROM myTable", query);
-        query = processor.getQuery(dbAdapter, "myTable", "col1,col2", null, null);
+        query = processor.getQuery(dbAdapter, "myTable", "col1,col2", null, null, null);
         assertEquals("SELECT col1,col2 FROM myTable", query);
 
-        query = processor.getQuery(dbAdapter, "myTable", null, Collections.singletonList("id"),
null);
+        query = processor.getQuery(dbAdapter, "myTable", null, Collections.singletonList("id"),
null, null);
         assertEquals("SELECT * FROM myTable", query);
 
         Map<String, String> maxValues = new HashMap<>();
@@ -145,24 +145,24 @@ public class QueryDatabaseTableTest {
         StateManager stateManager = runner.getStateManager();
         stateManager.setState(maxValues, Scope.CLUSTER);
         processor.putColumnType("mytable" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER
+ "id", Types.INTEGER);
-        query = processor.getQuery(dbAdapter, "myTable", null, Collections.singletonList("id"),
stateManager.getState(Scope.CLUSTER).toMap());
+        query = processor.getQuery(dbAdapter, "myTable", null, Collections.singletonList("id"),
null, stateManager.getState(Scope.CLUSTER).toMap());
         assertEquals("SELECT * FROM myTable WHERE id > 509", query);
 
         maxValues.put("date_created", "2016-03-07 12:34:56");
         stateManager.setState(maxValues, Scope.CLUSTER);
         processor.putColumnType("mytable" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER
+ "date_created", Types.TIMESTAMP);
-        query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"),
stateManager.getState(Scope.CLUSTER).toMap());
+        query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"),
null, stateManager.getState(Scope.CLUSTER).toMap());
         assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= '2016-03-07
12:34:56'", query);
 
         // Test Oracle strategy
         dbAdapter = new OracleDatabaseAdapter();
-        query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"),
stateManager.getState(Scope.CLUSTER).toMap());
-        assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= timestamp
'2016-03-07 12:34:56'", query);
+        query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"),
"type = \"CUSTOMER\"", stateManager.getState(Scope.CLUSTER).toMap());
+        assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= timestamp
'2016-03-07 12:34:56' AND (type = \"CUSTOMER\")", query);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testGetQueryNoTable() throws Exception {
-        processor.getQuery(dbAdapter, null, null, null, null);
+        processor.getQuery(dbAdapter, null, null, null, null, null);
     }
 
     @Test
@@ -587,7 +587,7 @@ public class QueryDatabaseTableTest {
         runner.clearTransferState();
 
         // Run again with a cleaned state. Should get all rows split into batches
-        int ffCount = (int) Math.ceil((double)rowCount / 9D);
+        int ffCount = (int) Math.ceil(rowCount / 9D);
         runner.getStateManager().clear(Scope.CLUSTER);
         runner.run();
         runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, ffCount);
@@ -717,6 +717,139 @@ public class QueryDatabaseTableTest {
         runner.clearTransferState();
     }
 
+    @Test
+    public void testAddedRowsCustomWhereClause() throws ClassNotFoundException, SQLException,
InitializationException, IOException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since Derby doesn't yet
support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, type varchar(20),
name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on)
VALUES (0, 'male', 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on)
VALUES (1, 'female', 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on)
VALUES (2, NULL, NULL, 2.0, '2010-01-01 00:00:00')");
+
+        runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_QUERY_DB_TABLE");
+        runner.setProperty(QueryDatabaseTable.WHERE_CLAUSE, "type = 'male'");
+        runner.setIncomingConnection(false);
+        runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID");
+        runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE,"2");
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
+
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0);
+        assertEquals("TEST_QUERY_DB_TABLE", flowFile.getAttribute(QueryDatabaseTable.RESULT_TABLENAME));
+        assertEquals(flowFile.getAttribute("maxvalue.id"), "0");
+        InputStream in = new ByteArrayInputStream(flowFile.toByteArray());
+        runner.setProperty(QueryDatabaseTable.FETCH_SIZE, "2");
+        assertEquals(1, getNumberOfRecordsFromStream(in));
+
+        runner.clearTransferState();
+
+        // Run again, this time no flowfiles/rows should be transferred
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        //Remove Max Rows Per Flow File
+        runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE,"0");
+
+        // Add a new row with a higher ID and run, one flowfile with one new row should be
transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on)
VALUES (3, 'female', 'Mary West', 15.0, '2000-01-01 03:23:34.234')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        // Sanity check - run again, this time no flowfiles/rows should be transferred
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        // Add timestamp as a max value column name
+        runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "id, created_on");
+
+        // Add a new row with a higher ID and run, one flow file will be transferred because
no max value for the timestamp has been stored
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on)
VALUES (4, 'male', 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
+        flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0);
+        assertEquals(flowFile.getAttribute("maxvalue.id"), "4");
+        assertEquals(flowFile.getAttribute("maxvalue.created_on"), "2011-01-01 03:23:34.234");
+        in = new ByteArrayInputStream(flowFile.toByteArray());
+        assertEquals(1, getNumberOfRecordsFromStream(in));
+        runner.clearTransferState();
+
+        // Add a new row with a higher ID but lower timestamp and run, no flow file will
be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on)
VALUES (5, 'male', 'NO NAME', 15.0, '2001-01-01 03:23:34.234')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        // Add a new row with a higher ID and run, one flow file will be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on)
VALUES (6, 'male', 'Mr. NiFi', 1.0, '2012-01-01 03:23:34.234')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
+        in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
+        assertEquals(1, getNumberOfRecordsFromStream(in));
+        runner.clearTransferState();
+
+        // Set name as the max value column name (and clear the state), all rows should be
returned since the max value for name has not been set
+        runner.getStateManager().clear(Scope.CLUSTER);
+        runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "name");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
+        in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
+        assertEquals(4, getNumberOfRecordsFromStream(in));
+        runner.clearTransferState();
+
+        // Add a new row with a "higher" name than the max but lower than "NULL" (to test
that null values are skipped), one flow file will be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on)
VALUES (7, 'male', 'NULK', 1.0, '2012-01-01 03:23:34.234')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
+        in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
+        assertEquals(1, getNumberOfRecordsFromStream(in));
+        runner.clearTransferState();
+
+        // Set scale as the max value column name (and clear the state), all rows should
be returned since the max value for name has not been set
+        runner.getStateManager().clear(Scope.CLUSTER);
+        runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "scale");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
+        in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
+        assertEquals(5, getNumberOfRecordsFromStream(in));
+        runner.clearTransferState();
+
+        // Add a new row with a higher value for scale than the max, one flow file will be
transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on)
VALUES (8, 'male', 'NULK', 100.0, '2012-01-01 03:23:34.234')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
+        in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
+        assertEquals(1, getNumberOfRecordsFromStream(in));
+        runner.clearTransferState();
+
+        // Set scale as the max value column name (and clear the state), all rows should
be returned since the max value for name has not been set
+        runner.getStateManager().clear(Scope.CLUSTER);
+        runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "bignum");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
+        in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
+        assertEquals(6, getNumberOfRecordsFromStream(in));
+        runner.clearTransferState();
+
+        // Add a new row with a higher value for scale than the max, one flow file will be
transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on,
bignum) VALUES (9, 'female', 'Alice Bob', 100.0, '2012-01-01 03:23:34.234', 1)");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
+        runner.clearTransferState();
+    }
+
     private long getNumberOfRecordsFromStream(InputStream in) throws IOException {
         final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
         try (DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in,
datumReader)) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/c10ff574/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
index 8d549fd..d8791a5 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
@@ -924,6 +924,121 @@ public class TestGenerateTableFetch {
         runner.clearTransferState();
     }
 
+    @Test
+    public void testAddedRowsWithCustomWhereClause() throws ClassNotFoundException, SQLException,
InitializationException, IOException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since Derby doesn't yet
support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, type varchar(20),
name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on)
VALUES (0, 'male', 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on)
VALUES (1, 'female', 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on)
VALUES (2, NULL, NULL, 2.0, '2010-01-01 00:00:00')");
+
+        runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE");
+        runner.setProperty(GenerateTableFetch.WHERE_CLAUSE, "type = 'male' OR type IS NULL");
+        runner.setIncomingConnection(false);
+        runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "ID");
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        String query = new String(flowFile.toByteArray());
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE (type = 'male' OR type IS NULL)"
+                + " AND ID <= 2 ORDER BY ID FETCH NEXT 10000 ROWS ONLY", query);
+        ResultSet resultSet = stmt.executeQuery(query);
+        // Should be two records
+        assertTrue(resultSet.next());
+        assertTrue(resultSet.next());
+        assertFalse(resultSet.next());
+        runner.clearTransferState();
+
+        // Run again, this time no flowfiles/rows should be transferred
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        // Add 3 new rows with a higher ID and run with a partition size of 2. Two flow files
should be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on)
VALUES (3, 'female', 'Mary West', 15.0, '2000-01-01 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on)
VALUES (4, 'male', 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on)
VALUES (5, 'male', 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')");
+        runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "1");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
+
+        // Verify first flow file's contents
+        flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        query = new String(flowFile.toByteArray());
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 2 AND (type = 'male'
OR type IS NULL)"
+                + " AND ID <= 5 ORDER BY ID FETCH NEXT 1 ROWS ONLY", query);
+        resultSet = stmt.executeQuery(query);
+        // Should be one record
+        assertTrue(resultSet.next());
+        assertFalse(resultSet.next());
+
+        // Verify second flow file's contents
+        flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(1);
+        query = new String(flowFile.toByteArray());
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 2 AND (type = 'male'
OR type IS NULL)"
+                + " AND ID <= 5 ORDER BY ID OFFSET 1 ROWS FETCH NEXT 1 ROWS ONLY", query);
+        resultSet = stmt.executeQuery(query);
+        // Should be one record
+        assertTrue(resultSet.next());
+        assertFalse(resultSet.next());
+        runner.clearTransferState();
+
+        // Add a new row with a higher ID and run, one flow file will be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on)
VALUES (6, 'male', 'Mr. NiFi', 1.0, '2012-01-01 03:23:34.234')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
+        flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        query = new String(flowFile.toByteArray());
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 5 AND (type = 'male'
OR type IS NULL)"
+                + " AND ID <= 6 ORDER BY ID FETCH NEXT 1 ROWS ONLY", query);
+        resultSet = stmt.executeQuery(query);
+        // Should be one record
+        assertTrue(resultSet.next());
+        assertFalse(resultSet.next());
+        runner.clearTransferState();
+
+        // Set name as the max value column name (and clear the state), all rows should be
returned since the max value for name has not been set
+        runner.getStateManager().clear(Scope.CLUSTER);
+        runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "name");
+        runner.setProperty(GenerateTableFetch.COLUMN_NAMES, "id, type, name, scale, created_on");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 5); // 5 records with partition
size 1 means 5 generated FlowFiles
+        flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        assertEquals("SELECT id, type, name, scale, created_on FROM TEST_QUERY_DB_TABLE WHERE
(type = 'male' OR type IS NULL)"
+                + " AND name <= 'Mr. NiFi' ORDER BY name FETCH NEXT 1 ROWS ONLY", new
String(flowFile.toByteArray()));
+        flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(1);
+        assertEquals("SELECT id, type, name, scale, created_on FROM TEST_QUERY_DB_TABLE WHERE
(type = 'male' OR type IS NULL)"
+                + " AND name <= 'Mr. NiFi' ORDER BY name OFFSET 1 ROWS FETCH NEXT 1 ROWS
ONLY", new String(flowFile.toByteArray()));
+        flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(2);
+        assertEquals("SELECT id, type, name, scale, created_on FROM TEST_QUERY_DB_TABLE WHERE
(type = 'male' OR type IS NULL)"
+                + " AND name <= 'Mr. NiFi' ORDER BY name OFFSET 2 ROWS FETCH NEXT 1 ROWS
ONLY", new String(flowFile.toByteArray()));
+        flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(3);
+        assertEquals("SELECT id, type, name, scale, created_on FROM TEST_QUERY_DB_TABLE WHERE
(type = 'male' OR type IS NULL)"
+                + " AND name <= 'Mr. NiFi' ORDER BY name OFFSET 3 ROWS FETCH NEXT 1 ROWS
ONLY", new String(flowFile.toByteArray()));
+        flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(4);
+        assertEquals("SELECT id, type, name, scale, created_on FROM TEST_QUERY_DB_TABLE WHERE
(type = 'male' OR type IS NULL)"
+                + " AND name <= 'Mr. NiFi' ORDER BY name OFFSET 4 ROWS FETCH NEXT 1 ROWS
ONLY", new String(flowFile.toByteArray()));
+
+        assertEquals("TEST_QUERY_DB_TABLE", flowFile.getAttribute("generatetablefetch.tableName"));
+        assertEquals("id, type, name, scale, created_on", flowFile.getAttribute("generatetablefetch.columnNames"));
+        assertEquals("(type = 'male' OR type IS NULL) AND name <= 'Mr. NiFi'", flowFile.getAttribute("generatetablefetch.whereClause"));
+        assertEquals("name", flowFile.getAttribute("generatetablefetch.maxColumnNames"));
+        assertEquals("1", flowFile.getAttribute("generatetablefetch.limit"));
+        assertEquals("4", flowFile.getAttribute("generatetablefetch.offset"));
+
+        runner.clearTransferState();
+    }
 
     /**
      * Simple implementation only for GenerateTableFetch processor testing.


Mime
View raw message