nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mattyb...@apache.org
Subject nifi git commit: NIFI-3484: GenerateTableFetch Should Allow for Right Boundary
Date Thu, 07 Sep 2017 15:13:36 GMT
Repository: nifi
Updated Branches:
  refs/heads/master 527ce0b4e -> ae30c7f35


NIFI-3484: GenerateTableFetch Should Allow for Right Boundary

fix checkstyle issue, and added unit test showing data duplication issue, removed property

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #2091


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ae30c7f3
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ae30c7f3
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ae30c7f3

Branch: refs/heads/master
Commit: ae30c7f35013e1faf26c6bd3af122362fa4b361e
Parents: 527ce0b
Author: patricker <patricker@gmail.com>
Authored: Wed Feb 15 12:11:56 2017 -0700
Committer: Matthew Burgess <mattyb149@apache.org>
Committed: Thu Sep 7 11:05:38 2017 -0400

----------------------------------------------------------------------
 .../processors/standard/GenerateTableFetch.java |  62 +++-
 .../standard/TestGenerateTableFetch.java        | 283 +++++++++++++++++--
 2 files changed, 309 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/ae30c7f3/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
index f5407da..3db2782 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
@@ -237,23 +237,12 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor
{
             // For each maximum-value column, get a WHERE filter and a MAX(column) alias
             IntStream.range(0, maxValueColumnNameList.size()).forEach((index) -> {
                 String colName = maxValueColumnNameList.get(index);
+
                 maxValueSelectColumns.add("MAX(" + colName + ") " + colName);
-                final String fullyQualifiedStateKey = getStateKey(tableName, colName);
-                String maxValue = statePropertyMap.get(fullyQualifiedStateKey);
-                if (StringUtils.isEmpty(maxValue) && !isDynamicTableName) {
-                    // If the table name is static and the fully-qualified key was not found,
try just the column name
-                    maxValue = statePropertyMap.get(getStateKey(null, colName));
-                }
+                String maxValue = getColumnStateMaxValue(tableName, statePropertyMap, colName);
                 if (!StringUtils.isEmpty(maxValue)) {
-                    Integer type = columnTypeMap.get(fullyQualifiedStateKey);
-                    if (type == null && !isDynamicTableName) {
-                        // If the table name is static and the fully-qualified key was not
found, try just the column name
-                        type = columnTypeMap.get(getStateKey(null, colName));
-                    }
-                    if (type == null) {
-                        // This shouldn't happen as we are populating columnTypeMap when
the processor is scheduled or when the first maximum is observed
-                        throw new IllegalArgumentException("No column type found for: " +
colName);
-                    }
+                    Integer type = getColumnType(tableName, colName);
+
                     // Add a condition for the WHERE clause
                     maxValueClauses.add(colName + (index == 0 ? " > " : " >= ") + getLiteralByType(type,
maxValue, dbAdapter.getName()));
                 }
@@ -318,6 +307,23 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor
{
                     throw new SQLException("No rows returned from metadata query: " + selectQuery);
                 }
 
+                // for each maximum-value column get a right bounding WHERE condition
+                IntStream.range(0, maxValueColumnNameList.size()).forEach((index) -> {
+                    String colName = maxValueColumnNameList.get(index);
+
+                    maxValueSelectColumns.add("MAX(" + colName + ") " + colName);
+                    String maxValue = getColumnStateMaxValue(tableName, statePropertyMap,
colName);
+                    if (!StringUtils.isEmpty(maxValue)) {
+                        Integer type = getColumnType(tableName, colName);
+
+                        // Add a condition for the WHERE clause
+                        maxValueClauses.add(colName + " <= " + getLiteralByType(type,
maxValue, dbAdapter.getName()));
+                    }
+                });
+
+                //Update WHERE list to include new right hand boundaries
+                whereClause = StringUtils.join(maxValueClauses, " AND ");
+
                 final long numberOfFetches = (partitionSize == 0) ? 1 : (rowCount / partitionSize)
+ (rowCount % partitionSize == 0 ? 0 : 1);
 
                 // Generate SQL statements to read "pages" of data
@@ -377,4 +383,30 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor
{
             context.yield();
         }
     }
+
+    private String getColumnStateMaxValue(String tableName, Map<String, String> statePropertyMap,
String colName) {
+        final String fullyQualifiedStateKey = getStateKey(tableName, colName);
+        String maxValue = statePropertyMap.get(fullyQualifiedStateKey);
+        if (StringUtils.isEmpty(maxValue) && !isDynamicTableName) {
+            // If the table name is static and the fully-qualified key was not found, try
just the column name
+            maxValue = statePropertyMap.get(getStateKey(null, colName));
+        }
+
+        return maxValue;
+    }
+
+    private Integer getColumnType(String tableName, String colName) {
+        final String fullyQualifiedStateKey = getStateKey(tableName, colName);
+        Integer type = columnTypeMap.get(fullyQualifiedStateKey);
+        if (type == null && !isDynamicTableName) {
+            // If the table name is static and the fully-qualified key was not found, try
just the column name
+            type = columnTypeMap.get(getStateKey(null, colName));
+        }
+        if (type == null) {
+            // This shouldn't happen as we are populating columnTypeMap when the processor
is scheduled or when the first maximum is observed
+            throw new IllegalArgumentException("No column type found for: " + colName);
+        }
+
+        return type;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ae30c7f3/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
index 76bc1f0..8d549fd 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
@@ -139,7 +139,7 @@ public class TestGenerateTableFetch {
         runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
         MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
         String query = new String(flowFile.toByteArray());
-        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY ID FETCH NEXT 10000 ROWS
ONLY", query);
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID <= 2 ORDER BY ID FETCH
NEXT 10000 ROWS ONLY", query);
         ResultSet resultSet = stmt.executeQuery(query);
         // Should be three records
         assertTrue(resultSet.next());
@@ -164,7 +164,7 @@ public class TestGenerateTableFetch {
         // Verify first flow file's contents
         flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
         query = new String(flowFile.toByteArray());
-        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 2 ORDER BY ID FETCH
NEXT 2 ROWS ONLY", query);
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 2 AND ID <= 5 ORDER
BY ID FETCH NEXT 2 ROWS ONLY", query);
         resultSet = stmt.executeQuery(query);
         // Should be two records
         assertTrue(resultSet.next());
@@ -174,7 +174,7 @@ public class TestGenerateTableFetch {
         // Verify second flow file's contents
         flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(1);
         query = new String(flowFile.toByteArray());
-        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 2 ORDER BY ID OFFSET
2 ROWS FETCH NEXT 2 ROWS ONLY", query);
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 2 AND ID <= 5 ORDER
BY ID OFFSET 2 ROWS FETCH NEXT 2 ROWS ONLY", query);
         resultSet = stmt.executeQuery(query);
         // Should be one record
         assertTrue(resultSet.next());
@@ -187,7 +187,7 @@ public class TestGenerateTableFetch {
         runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
         flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
         query = new String(flowFile.toByteArray());
-        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 5 ORDER BY ID FETCH
NEXT 2 ROWS ONLY", query);
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 5 AND ID <= 6 ORDER
BY ID FETCH NEXT 2 ROWS ONLY", query);
         resultSet = stmt.executeQuery(query);
         // Should be one record
         assertTrue(resultSet.next());
@@ -201,16 +201,16 @@ public class TestGenerateTableFetch {
         runner.run();
         runner.assertAllFlowFilesTransferred(REL_SUCCESS, 4); // 7 records with partition
size 2 means 4 generated FlowFiles
         flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
-        assertEquals("SELECT id, name, scale, created_on FROM TEST_QUERY_DB_TABLE ORDER BY
name FETCH NEXT 2 ROWS ONLY", new String(flowFile.toByteArray()));
+        assertEquals("SELECT id, name, scale, created_on FROM TEST_QUERY_DB_TABLE WHERE name
<= 'Mr. NiFi' ORDER BY name FETCH NEXT 2 ROWS ONLY", new String(flowFile.toByteArray()));
         flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(1);
-        assertEquals("SELECT id, name, scale, created_on FROM TEST_QUERY_DB_TABLE ORDER BY
name OFFSET 2 ROWS FETCH NEXT 2 ROWS ONLY", new String(flowFile.toByteArray()));
+        assertEquals("SELECT id, name, scale, created_on FROM TEST_QUERY_DB_TABLE WHERE name
<= 'Mr. NiFi' ORDER BY name OFFSET 2 ROWS FETCH NEXT 2 ROWS ONLY", new String(flowFile.toByteArray()));
         flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(2);
-        assertEquals("SELECT id, name, scale, created_on FROM TEST_QUERY_DB_TABLE ORDER BY
name OFFSET 4 ROWS FETCH NEXT 2 ROWS ONLY", new String(flowFile.toByteArray()));
+        assertEquals("SELECT id, name, scale, created_on FROM TEST_QUERY_DB_TABLE WHERE name
<= 'Mr. NiFi' ORDER BY name OFFSET 4 ROWS FETCH NEXT 2 ROWS ONLY", new String(flowFile.toByteArray()));
         flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(3);
-        assertEquals("SELECT id, name, scale, created_on FROM TEST_QUERY_DB_TABLE ORDER BY
name OFFSET 6 ROWS FETCH NEXT 2 ROWS ONLY", new String(flowFile.toByteArray()));
+        assertEquals("SELECT id, name, scale, created_on FROM TEST_QUERY_DB_TABLE WHERE name
<= 'Mr. NiFi' ORDER BY name OFFSET 6 ROWS FETCH NEXT 2 ROWS ONLY", new String(flowFile.toByteArray()));
         assertEquals("TEST_QUERY_DB_TABLE", flowFile.getAttribute("generatetablefetch.tableName"));
         assertEquals("id, name, scale, created_on", flowFile.getAttribute("generatetablefetch.columnNames"));
-        assertEquals(null, flowFile.getAttribute("generatetablefetch.whereClause"));
+        assertEquals("name <= 'Mr. NiFi'", flowFile.getAttribute("generatetablefetch.whereClause"));
         assertEquals("name", flowFile.getAttribute("generatetablefetch.maxColumnNames"));
         assertEquals("2", flowFile.getAttribute("generatetablefetch.limit"));
         assertEquals("6", flowFile.getAttribute("generatetablefetch.offset"));
@@ -219,6 +219,190 @@ public class TestGenerateTableFetch {
     }
 
     @Test
+    public void testAddedRowsRightBounded() throws ClassNotFoundException, SQLException,
InitializationException, IOException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since Derby doesn't yet
support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100),
scale float, created_on timestamp, bignum bigint default 0)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES
(0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES
(1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES
(2, NULL, 2.0, '2010-01-01 00:00:00')");
+
+        runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE");
+        runner.setIncomingConnection(false);
+        runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "ID");
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        String query = new String(flowFile.toByteArray());
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID <= 2 ORDER BY ID FETCH
NEXT 10000 ROWS ONLY", query);
+        ResultSet resultSet = stmt.executeQuery(query);
+        // Should be three records
+        assertTrue(resultSet.next());
+        assertTrue(resultSet.next());
+        assertTrue(resultSet.next());
+        assertFalse(resultSet.next());
+        runner.clearTransferState();
+
+        // Run again, this time no flowfiles/rows should be transferred
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        // Add 3 new rows with a higher ID and run with a partition size of 2. Two flow files
should be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES
(3, 'Mary West', 15.0, '2000-01-01 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES
(4, 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES
(5, 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')");
+        runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "2");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
+
+        // Verify first flow file's contents
+        flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        query = new String(flowFile.toByteArray());
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 2 AND ID <= 5 ORDER
BY ID FETCH NEXT 2 ROWS ONLY", query);
+        resultSet = stmt.executeQuery(query);
+        // Should be two records
+        assertTrue(resultSet.next());
+        assertTrue(resultSet.next());
+        assertFalse(resultSet.next());
+
+        // Verify second flow file's contents
+        flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(1);
+        query = new String(flowFile.toByteArray());
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 2 AND ID <= 5 ORDER
BY ID OFFSET 2 ROWS FETCH NEXT 2 ROWS ONLY", query);
+        resultSet = stmt.executeQuery(query);
+        // Should be one record
+        assertTrue(resultSet.next());
+        assertFalse(resultSet.next());
+        runner.clearTransferState();
+
+        // Add a new row with a higher ID and run, one flow file will be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES
(6, 'Mr. NiFi', 1.0, '2012-01-01 03:23:34.234')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
+        flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        query = new String(flowFile.toByteArray());
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 5 AND ID <= 6 ORDER
BY ID FETCH NEXT 2 ROWS ONLY", query);
+        resultSet = stmt.executeQuery(query);
+        // Should be one record
+        assertTrue(resultSet.next());
+        assertFalse(resultSet.next());
+        runner.clearTransferState();
+
+        // Set name as the max value column name (and clear the state), all rows should be
returned since the max value for name has not been set
+        runner.getStateManager().clear(Scope.CLUSTER);
+        runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "name");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 4); // 7 records with partition
size 2 means 4 generated FlowFiles
+        flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE name <= 'Mr. NiFi' ORDER
BY name FETCH NEXT 2 ROWS ONLY", new String(flowFile.toByteArray()));
+        flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(1);
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE name <= 'Mr. NiFi' ORDER
BY name OFFSET 2 ROWS FETCH NEXT 2 ROWS ONLY", new String(flowFile.toByteArray()));
+        flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(2);
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE name <= 'Mr. NiFi' ORDER
BY name OFFSET 4 ROWS FETCH NEXT 2 ROWS ONLY", new String(flowFile.toByteArray()));
+        flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(3);
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE name <= 'Mr. NiFi' ORDER
BY name OFFSET 6 ROWS FETCH NEXT 2 ROWS ONLY", new String(flowFile.toByteArray()));
+
+        runner.clearTransferState();
+    }
+
+    @Test
+    public void testAddedRowsTimestampRightBounded() throws ClassNotFoundException, SQLException,
InitializationException, IOException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since Derby doesn't yet
support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100),
scale float, created_on timestamp, bignum bigint default 0)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES
(0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES
(1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES
(2, NULL, 2.0, '2010-01-01 00:00:00')");
+
+        runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE");
+        runner.setIncomingConnection(false);
+        runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "created_on");
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        String query = new String(flowFile.toByteArray());
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE created_on <= '2010-01-01
00:00:00.0' ORDER BY created_on FETCH NEXT 10000 ROWS ONLY", query);
+        ResultSet resultSet = stmt.executeQuery(query);
+        // Should be three records
+        assertTrue(resultSet.next());
+        assertTrue(resultSet.next());
+        assertTrue(resultSet.next());
+        assertFalse(resultSet.next());
+        runner.clearTransferState();
+
+        // Run again, this time no flowfiles/rows should be transferred
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        // Add 5 new rows, 3 with higher timestamps, 2 with a lower timestamp.
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES
(3, 'Mary West', 15.0, '2000-01-01 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES
(4, 'Mary West', 15.0, '2000-01-01 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES
(5, 'Marty Johnson', 15.0, '2011-01-01 02:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES
(6, 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES
(7, 'James Johnson', 16.0, '2011-01-01 04:23:34.236')");
+        runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "2");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
+
+        // Verify first flow file's contents
+        flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        query = new String(flowFile.toByteArray());
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE created_on > '2010-01-01
00:00:00.0' AND "
+                + "created_on <= '2011-01-01 04:23:34.236' ORDER BY created_on FETCH NEXT
2 ROWS ONLY", query);
+        resultSet = stmt.executeQuery(query);
+        // Should be two records
+        assertTrue(resultSet.next());
+        assertTrue(resultSet.next());
+        assertFalse(resultSet.next());
+
+        // Verify second flow file's contents
+        flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(1);
+        query = new String(flowFile.toByteArray());
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE created_on > '2010-01-01
00:00:00.0' AND "
+                + "created_on <= '2011-01-01 04:23:34.236' ORDER BY created_on OFFSET
2 ROWS FETCH NEXT 2 ROWS ONLY", query);
+        resultSet = stmt.executeQuery(query);
+        // Should be one record
+        assertTrue(resultSet.next());
+        assertFalse(resultSet.next());
+        runner.clearTransferState();
+
+        // Add a new row with a higher created_on and run, one flow file will be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES
(8, 'Mr. NiFi', 1.0, '2012-01-01 03:23:34.234')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
+        flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        query = new String(flowFile.toByteArray());
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE created_on > '2011-01-01
04:23:34.236' AND created_on <= '2012-01-01 03:23:34.234' ORDER BY created_on FETCH NEXT
2 ROWS ONLY", query);
+        resultSet = stmt.executeQuery(query);
+        // Should be one record
+        assertTrue(resultSet.next());
+        assertFalse(resultSet.next());
+        runner.clearTransferState();
+    }
+
+    @Test
     public void testOnePartition() throws ClassNotFoundException, SQLException, InitializationException,
IOException {
 
         // load test data to database
@@ -244,7 +428,7 @@ public class TestGenerateTableFetch {
 
         runner.run();
         runner.assertAllFlowFilesTransferred(GenerateTableFetch.REL_SUCCESS, 1);
-        runner.getFlowFilesForRelationship(GenerateTableFetch.REL_SUCCESS).get(0).assertContentEquals("SELECT
* FROM TEST_QUERY_DB_TABLE ORDER BY ID");
+        runner.getFlowFilesForRelationship(GenerateTableFetch.REL_SUCCESS).get(0).assertContentEquals("SELECT
* FROM TEST_QUERY_DB_TABLE WHERE ID <= 2 ORDER BY ID");
         runner.clearTransferState();
     }
 
@@ -410,7 +594,7 @@ public class TestGenerateTableFetch {
 
         runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
         MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
-        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE id > 0 ORDER BY id FETCH
NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray()));
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE id > 0 AND id <= 1 ORDER
BY id FETCH NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray()));
     }
 
     @Test
@@ -451,10 +635,10 @@ public class TestGenerateTableFetch {
         runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
         MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
         // Note there is no WHERE clause here. Because we are using dynamic tables, the old
state key/value is not retrieved
-        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY id FETCH NEXT 10000 ROWS
ONLY", new String(flowFile.toByteArray()));
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE id <= 1 ORDER BY id FETCH
NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray()));
         assertEquals("TEST_QUERY_DB_TABLE", flowFile.getAttribute("generatetablefetch.tableName"));
         assertEquals(null, flowFile.getAttribute("generatetablefetch.columnNames"));
-        assertEquals(null, flowFile.getAttribute("generatetablefetch.whereClause"));
+        assertEquals("id <= 1", flowFile.getAttribute("generatetablefetch.whereClause"));
         assertEquals("id", flowFile.getAttribute("generatetablefetch.maxColumnNames"));
         assertEquals("10000", flowFile.getAttribute("generatetablefetch.limit"));
         assertEquals("0", flowFile.getAttribute("generatetablefetch.offset"));
@@ -470,10 +654,10 @@ public class TestGenerateTableFetch {
 
         runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
         flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
-        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE id > 1 ORDER BY id FETCH
NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray()));
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE id > 1 AND id <= 2 ORDER
BY id FETCH NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray()));
         assertEquals("TEST_QUERY_DB_TABLE", flowFile.getAttribute("generatetablefetch.tableName"));
         assertEquals(null, flowFile.getAttribute("generatetablefetch.columnNames"));
-        assertEquals("id > 1", flowFile.getAttribute("generatetablefetch.whereClause"));
+        assertEquals("id > 1 AND id <= 2", flowFile.getAttribute("generatetablefetch.whereClause"));
         assertEquals("id", flowFile.getAttribute("generatetablefetch.maxColumnNames"));
         assertEquals("10000", flowFile.getAttribute("generatetablefetch.limit"));
         assertEquals("0", flowFile.getAttribute("generatetablefetch.offset"));
@@ -516,7 +700,7 @@ public class TestGenerateTableFetch {
         runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
         MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
         // Note there is no WHERE clause here. Because we are using dynamic tables, the old
state key/value is not retrieved
-        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY id FETCH NEXT 10000 ROWS
ONLY", new String(flowFile.toByteArray()));
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE id <= 1 ORDER BY id FETCH
NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray()));
 
         runner.clearTransferState();
         stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (2, 0)");
@@ -529,7 +713,7 @@ public class TestGenerateTableFetch {
 
         runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
         flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
-        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE id > 1 ORDER BY id FETCH
NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray()));
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE id > 1 AND id <= 2 ORDER
BY id FETCH NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray()));
     }
 
     @Test
@@ -570,7 +754,7 @@ public class TestGenerateTableFetch {
         MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
         // Note there is no WHERE clause here. Because we are using dynamic tables (i.e.
Expression Language,
         // even when not referring to flow file attributes), the old state key/value is not
retrieved
-        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY id FETCH NEXT 10000 ROWS
ONLY", new String(flowFile.toByteArray()));
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE id <= 1 ORDER BY id FETCH
NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray()));
     }
 
     @Test
@@ -641,7 +825,7 @@ public class TestGenerateTableFetch {
         runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
         MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
         String query = new String(flowFile.toByteArray());
-        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 1 ORDER BY ID FETCH
NEXT 10000 ROWS ONLY", query);
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 1 AND ID <= 2 ORDER
BY ID FETCH NEXT 10000 ROWS ONLY", query);
         ResultSet resultSet = stmt.executeQuery(query);
         // Should be one record (the initial max value skips the first two)
         assertTrue(resultSet.next());
@@ -665,7 +849,7 @@ public class TestGenerateTableFetch {
         // Verify first flow file's contents
         flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
         query = new String(flowFile.toByteArray());
-        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 2 ORDER BY ID FETCH
NEXT 2 ROWS ONLY", query);
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 2 AND ID <= 5 ORDER
BY ID FETCH NEXT 2 ROWS ONLY", query);
         resultSet = stmt.executeQuery(query);
         // Should be two records
         assertTrue(resultSet.next());
@@ -675,7 +859,7 @@ public class TestGenerateTableFetch {
         // Verify second flow file's contents
         flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(1);
         query = new String(flowFile.toByteArray());
-        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 2 ORDER BY ID OFFSET
2 ROWS FETCH NEXT 2 ROWS ONLY", query);
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 2 AND ID <= 5 ORDER
BY ID OFFSET 2 ROWS FETCH NEXT 2 ROWS ONLY", query);
         resultSet = stmt.executeQuery(query);
         // Should be one record
         assertTrue(resultSet.next());
@@ -683,6 +867,63 @@ public class TestGenerateTableFetch {
         runner.clearTransferState();
     }
 
+    @Test
+    public void testNoDuplicateWithRightBounded() throws ClassNotFoundException, SQLException,
InitializationException, IOException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since Derby doesn't yet
support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100),
scale float, created_on timestamp, bignum bigint default 0)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES
(0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES
(1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES
(2, NULL, 2.0, '2010-01-01 00:00:00')");
+
+        runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE");
+        runner.setIncomingConnection(false);
+        runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "ID");
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        String query = new String(flowFile.toByteArray());
+
+        // we now insert a row before the query issued by GFT is actually executed by, let's
say, ExecuteSQL processor
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES
(3, 'Mary West', 15.0, '2000-01-01 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES
(4, 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES
(5, 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')");
+
+        ResultSet resultSet = stmt.executeQuery(query);
+        int numberRecordsFirstExecution = 0; // Should be three records
+        while(resultSet.next()) {
+            numberRecordsFirstExecution++;
+        }
+        runner.clearTransferState();
+
+        // Run again
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
+        flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        query = new String(flowFile.toByteArray());
+
+        resultSet = stmt.executeQuery(query);
+        int numberRecordsSecondExecution = 0; // Should be three records
+        while(resultSet.next()) {
+            numberRecordsSecondExecution++;
+        }
+
+        // will fail and will be equal to 9 if right-bounded parameter is set to false.
+        assertEquals(numberRecordsFirstExecution + numberRecordsSecondExecution, 6);
+
+        runner.clearTransferState();
+    }
+
 
     /**
      * Simple implementation only for GenerateTableFetch processor testing.


Mime
View raw message