nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pvill...@apache.org
Subject nifi git commit: NIFI-4522: Add SQL Statement property to PutSQL
Date Fri, 27 Oct 2017 07:19:07 GMT
Repository: nifi
Updated Branches:
  refs/heads/master 1625f719e -> 856dedab1


NIFI-4522: Add SQL Statement property to PutSQL

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2225.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/856dedab
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/856dedab
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/856dedab

Branch: refs/heads/master
Commit: 856dedab122f767cf52b18d28b0adf3ba3c09c70
Parents: 1625f71
Author: Matthew Burgess <mattyb149@apache.org>
Authored: Wed Oct 25 09:53:16 2017 -0400
Committer: Pierre Villard <pierre.villard.fr@gmail.com>
Committed: Fri Oct 27 09:17:51 2017 +0200

----------------------------------------------------------------------
 .../apache/nifi/processors/standard/PutSQL.java | 26 +++++++++--
 .../nifi/processors/standard/TestPutSQL.java    | 46 ++++++++++++++++++++
 2 files changed, 69 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/856dedab/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
index f75ccaa..b50dcd0 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
@@ -139,6 +139,19 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
             .identifiesControllerService(DBCPService.class)
             .required(true)
             .build();
+
+    static final PropertyDescriptor SQL_STATEMENT = new PropertyDescriptor.Builder()
+            .name("putsql-sql-statement")
+            .displayName("SQL Statement")
+            .description("The SQL statement to execute. The statement can be empty, a constant
value, or built from attributes "
+                    + "using Expression Language. If this property is specified, it will
be used regardless of the content of "
+                    + "incoming flowfiles. If this property is empty, the content of the
incoming flow file is expected "
+                    + "to contain a valid SQL statement, to be issued by the processor to
the database.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
     static final PropertyDescriptor SUPPORT_TRANSACTIONS = new PropertyDescriptor.Builder()
             .name("Support Fragmented Transactions")
             .description("If true, when a FlowFile is consumed by this Processor, the Processor
will first check the fragment.identifier and fragment.count attributes of that FlowFile. "
@@ -197,6 +210,7 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> properties = new ArrayList<>();
         properties.add(CONNECTION_POOL);
+        properties.add(SQL_STATEMENT);
         properties.add(SUPPORT_TRANSACTIONS);
         properties.add(TRANSACTION_TIMEOUT);
         properties.add(BATCH_SIZE);
@@ -269,7 +283,9 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
         groups.add(fragmentedEnclosure);
 
         for (final FlowFile flowFile : flowFiles) {
-            final String sql = getSQL(session, flowFile);
+            final String sql = context.getProperty(PutSQL.SQL_STATEMENT).isSet()
+                    ? context.getProperty(PutSQL.SQL_STATEMENT).evaluateAttributeExpressions(flowFile).getValue()
+                    : getSQL(session, flowFile);
 
             final StatementFlowFileEnclosure enclosure = sqlToEnclosure
                     .computeIfAbsent(sql, k -> new StatementFlowFileEnclosure(sql));
@@ -280,7 +296,9 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
 
     private final GroupingFunction groupFlowFilesBySQLBatch = (context, session, fc, conn,
flowFiles, groups, sqlToEnclosure, result) -> {
         for (final FlowFile flowFile : flowFiles) {
-            final String sql = getSQL(session, flowFile);
+            final String sql = context.getProperty(PutSQL.SQL_STATEMENT).isSet()
+                    ? context.getProperty(PutSQL.SQL_STATEMENT).evaluateAttributeExpressions(flowFile).getValue()
+                    : getSQL(session, flowFile);
 
             // Get or create the appropriate PreparedStatement to use.
             final StatementFlowFileEnclosure enclosure = sqlToEnclosure
@@ -304,7 +322,9 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
 
     private GroupingFunction groupFlowFilesBySQL = (context, session, fc, conn, flowFiles,
groups, sqlToEnclosure, result) -> {
         for (final FlowFile flowFile : flowFiles) {
-            final String sql = getSQL(session, flowFile);
+            final String sql = context.getProperty(PutSQL.SQL_STATEMENT).isSet()
+                    ? context.getProperty(PutSQL.SQL_STATEMENT).evaluateAttributeExpressions(flowFile).getValue()
+                    : getSQL(session, flowFile);
 
             // Get or create the appropriate PreparedStatement to use.
             final StatementFlowFileEnclosure enclosure = sqlToEnclosure

http://git-wip-us.apache.org/repos/asf/nifi/blob/856dedab/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
index 5a2909b..b804447 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
@@ -1412,6 +1412,52 @@ public class TestPutSQL {
         runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 0);
     }
 
+    @Test
+    public void testStatementsFromProperty() throws InitializationException, ProcessException,
SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+        runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(PutSQL.SQL_STATEMENT, "INSERT INTO PERSONS (ID, NAME, CODE) VALUES
(${row.id}, 'Mark', 84)");
+
+        recreateTable("PERSONS", createPersons);
+
+        runner.enqueue("This statement should be ignored".getBytes(), new HashMap<String,String>()
{{
+            put("row.id", "1");
+        }});
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
+                assertTrue(rs.next());
+                assertEquals(1, rs.getInt(1));
+                assertEquals("Mark", rs.getString(2));
+                assertEquals(84, rs.getInt(3));
+                assertFalse(rs.next());
+            }
+        }
+
+        runner.setProperty(PutSQL.SQL_STATEMENT, "UPDATE PERSONS SET NAME='George' WHERE
ID=${row.id}");
+        runner.enqueue("This statement should be ignored".getBytes(), new HashMap<String,String>()
{{
+            put("row.id", "1");
+        }});
+        runner.run();
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
+                assertTrue(rs.next());
+                assertEquals(1, rs.getInt(1));
+                assertEquals("George", rs.getString(2));
+                assertEquals(84, rs.getInt(3));
+                assertFalse(rs.next());
+            }
+        }
+    }
+
     /**
      * Simple implementation only for testing purposes
      */


Mime
View raw message