nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pvill...@apache.org
Subject nifi git commit: NIFI-4684 - Added SQL Parameter Attribute Prefix property to ConvertJSONToSQL
Date Tue, 19 Dec 2017 09:32:59 GMT
Repository: nifi
Updated Branches:
  refs/heads/master 17718940d -> 612675e42


NIFI-4684 - Added SQL Parameter Attribute Prefix property to ConvertJSONToSQL

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2333.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/612675e4
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/612675e4
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/612675e4

Branch: refs/heads/master
Commit: 612675e4283a22481ee76a0ad81a092f500150a9
Parents: 1771894
Author: Matthew Burgess <mattyb149@apache.org>
Authored: Fri Dec 8 22:01:25 2017 -0500
Committer: Pierre Villard <pierre.villard.fr@gmail.com>
Committed: Tue Dec 19 10:32:52 2017 +0100

----------------------------------------------------------------------
 .../processors/standard/ConvertJSONToSQL.java   | 71 ++++++++++++--------
 .../standard/TestConvertJSONToSQL.java          | 29 ++++++++
 2 files changed, 73 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/612675e4/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
index 00db3b5..638ec9d 100755
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
@@ -83,9 +83,10 @@ import static org.apache.nifi.flowfile.attributes.FragmentAttributes.copyAttribu
         + "relationship and the SQL is routed to the 'sql' relationship.")
 @WritesAttributes({
         @WritesAttribute(attribute="mime.type", description="Sets mime.type of FlowFile that
is routed to 'sql' to 'text/plain'."),
-        @WritesAttribute(attribute="sql.table", description="Sets the sql.table attribute
of FlowFile that is routed to 'sql' to the name of the table that is updated by the SQL statement."),
-        @WritesAttribute(attribute="sql.catalog", description="If the Catalog name is set
for this database, specifies the name of the catalog that the SQL statement will update. "
-                + "If no catalog is used, this attribute will not be added."),
+        @WritesAttribute(attribute = "<sql>.table", description = "Sets the <sql>.table
attribute of FlowFile that is routed to 'sql' to the name of the table that is updated by
the SQL statement. "
+                + "The prefix for this attribute ('sql', e.g.) is determined by the SQL Parameter
Attribute Prefix property."),
+        @WritesAttribute(attribute="<sql>.catalog", description="If the Catalog name
is set for this database, specifies the name of the catalog that the SQL statement will update.
"
+                + "If no catalog is used, this attribute will not be added. The prefix for
this attribute ('sql', e.g.) is determined by the SQL Parameter Attribute Prefix property."),
         @WritesAttribute(attribute="fragment.identifier", description="All FlowFiles routed
to the 'sql' relationship for the same incoming FlowFile (multiple will be output for the
same incoming "
                 + "FlowFile if the incoming FlowFile is a JSON Array) will have the same
value for the fragment.identifier attribute. This can then be used to correlate the results."),
         @WritesAttribute(attribute="fragment.count", description="The number of SQL FlowFiles
that were produced for same incoming FlowFile. This can be used in conjunction with the "
@@ -93,12 +94,14 @@ import static org.apache.nifi.flowfile.attributes.FragmentAttributes.copyAttribu
         @WritesAttribute(attribute="fragment.index", description="The position of this FlowFile
in the list of outgoing FlowFiles that were all derived from the same incoming FlowFile. This
can be "
                 + "used in conjunction with the fragment.identifier and fragment.count attributes
to know which FlowFiles originated from the same incoming FlowFile and in what order the SQL
"
                 + "FlowFiles were produced"),
-        @WritesAttribute(attribute="sql.args.N.type", description="The output SQL statements
are parametrized in order to avoid SQL Injection Attacks. The types of the Parameters "
-                + "to use are stored in attributes named sql.args.1.type, sql.args.2.type,
sql.args.3.type, and so on. The type is a number representing a JDBC Type constant. "
-                + "Generally, this is useful only for software to read and interpret but
is added so that a processor such as PutSQL can understand how to interpret the values."),
-        @WritesAttribute(attribute="sql.args.N.value", description="The output SQL statements
are parametrized in order to avoid SQL Injection Attacks. The values of the Parameters "
+        @WritesAttribute(attribute="<sql>.args.N.type", description="The output SQL
statements are parametrized in order to avoid SQL Injection Attacks. The types of the Parameters
"
+                + "to use are stored in attributes named <sql>.args.1.type, <sql>.args.2.type,
<sql>.args.3.type, and so on. The type is a number representing a JDBC Type constant.
"
+                + "Generally, this is useful only for software to read and interpret but
is added so that a processor such as PutSQL can understand how to interpret the values. "
+                + "The prefix for this attribute ('sql', e.g.) is determined by the SQL Parameter
Attribute Prefix property."),
+        @WritesAttribute(attribute="<sql>.args.N.value", description="The output SQL
statements are parametrized in order to avoid SQL Injection Attacks. The values of the Parameters
"
                 + "to use are stored in the attributes named sql.args.1.value, sql.args.2.value,
sql.args.3.value, and so on. Each of these attributes has a corresponding "
-                + "sql.args.N.type attribute that indicates how the value should be interpreted
when inserting it into the database.")
+                + "<sql>.args.N.type attribute that indicates how the value should
be interpreted when inserting it into the database."
+                + "The prefix for this attribute ('sql', e.g.) is determined by the SQL Parameter
Attribute Prefix property.")
 })
 public class ConvertJSONToSQL extends AbstractProcessor {
     private static final String UPDATE_TYPE = "UPDATE";
@@ -201,6 +204,16 @@ public class ConvertJSONToSQL extends AbstractProcessor {
             .defaultValue("false")
             .build();
 
+    static final PropertyDescriptor SQL_PARAM_ATTR_PREFIX = new PropertyDescriptor.Builder()
+            .name("jts-sql-param-attr-prefix")
+            .displayName("SQL Parameter Attribute Prefix")
+            .description("The string to be prepended to the outgoing flow file attributes,
such as <sql>.args.1.value, where <sql> is replaced with the specified value")
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .required(true)
+            .defaultValue("sql")
+            .build();
+
     static final Relationship REL_ORIGINAL = new Relationship.Builder()
             .name("original")
             .description("When a FlowFile is converted to SQL, the original JSON FlowFile
is routed to this relationship")
@@ -238,6 +251,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
         properties.add(UPDATE_KEY);
         properties.add(QUOTED_IDENTIFIERS);
         properties.add(QUOTED_TABLE_IDENTIFIER);
+        properties.add(SQL_PARAM_ATTR_PREFIX);
         return properties;
     }
 
@@ -287,6 +301,9 @@ public class ConvertJSONToSQL extends AbstractProcessor {
         // Quote table name?
         final boolean quoteTableName = context.getProperty(QUOTED_TABLE_IDENTIFIER).asBoolean();
 
+        // Attribute prefix
+        final String attributePrefix = context.getProperty(SQL_PARAM_ATTR_PREFIX).evaluateAttributeExpressions(flowFile).getValue();
+
         // get the database schema from the cache, if one exists. We do this in a synchronized
block, rather than
         // using a ConcurrentMap because the Map that we are using is a LinkedHashMap with
a capacity such that if
         // the Map grows beyond this capacity, old elements are evicted. We do this in order
to avoid filling the
@@ -364,13 +381,13 @@ public class ConvertJSONToSQL extends AbstractProcessor {
 
                 if (INSERT_TYPE.equals(statementType)) {
                     sql = generateInsert(jsonNode, attributes, fqTableName, schema, translateFieldNames,
ignoreUnmappedFields,
-                            failUnmappedColumns, warningUnmappedColumns, escapeColumnNames,
quoteTableName);
+                            failUnmappedColumns, warningUnmappedColumns, escapeColumnNames,
quoteTableName, attributePrefix);
                 } else if (UPDATE_TYPE.equals(statementType)) {
                     sql = generateUpdate(jsonNode, attributes, fqTableName, updateKeys, schema,
translateFieldNames, ignoreUnmappedFields,
-                            failUnmappedColumns, warningUnmappedColumns, escapeColumnNames,
quoteTableName);
+                            failUnmappedColumns, warningUnmappedColumns, escapeColumnNames,
quoteTableName, attributePrefix);
                 } else {
                     sql = generateDelete(jsonNode, attributes, fqTableName, schema, translateFieldNames,
ignoreUnmappedFields,
-                            failUnmappedColumns, warningUnmappedColumns, escapeColumnNames,
quoteTableName);
+                            failUnmappedColumns, warningUnmappedColumns, escapeColumnNames,
quoteTableName, attributePrefix);
                 }
             } catch (final ProcessException pe) {
                 getLogger().error("Failed to convert {} to a SQL {} statement due to {};
routing to failure",
@@ -391,13 +408,13 @@ public class ConvertJSONToSQL extends AbstractProcessor {
             });
 
             attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
-            attributes.put("sql.table", tableName);
+            attributes.put(attributePrefix + ".table", tableName);
             attributes.put(FRAGMENT_ID.key(), fragmentIdentifier);
             attributes.put(FRAGMENT_COUNT.key(), String.valueOf(arrayNode.size()));
             attributes.put(FRAGMENT_INDEX.key(), String.valueOf(i));
 
             if (catalog != null) {
-                attributes.put("sql.catalog", catalog);
+                attributes.put(attributePrefix + ".catalog", catalog);
             }
 
             sqlFlowFile = session.putAllAttributes(sqlFlowFile, attributes);
@@ -420,7 +437,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
 
     private String generateInsert(final JsonNode rootNode, final Map<String, String>
attributes, final String tableName,
                                   final TableSchema schema, final boolean translateFieldNames,
final boolean ignoreUnmappedFields, final boolean failUnmappedColumns,
-                                  final boolean warningUnmappedColumns, boolean escapeColumnNames,
boolean quoteTableName) {
+                                  final boolean warningUnmappedColumns, boolean escapeColumnNames,
boolean quoteTableName, final String attributePrefix) {
 
         final Set<String> normalizedFieldNames = getNormalizedColumnNames(rootNode,
translateFieldNames);
         for (final String requiredColName : schema.getRequiredColumnNames()) {
@@ -449,7 +466,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
         sqlBuilder.append(" (");
 
         // iterate over all of the elements in the JSON, building the SQL statement by adding
the column names, as well as
-        // adding the column value to a "sql.args.N.value" attribute and the type of a "sql.args.N.type"
attribute add the
+        // adding the column value to a "<sql>.args.N.value" attribute and the type
of a "<sql>.args.N.type" attribute add the
         // columns that we are inserting into
         final Iterator<String> fieldNames = rootNode.getFieldNames();
         while (fieldNames.hasNext()) {
@@ -474,13 +491,13 @@ public class ConvertJSONToSQL extends AbstractProcessor {
                 }
 
                 final int sqlType = desc.getDataType();
-                attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType));
+                attributes.put(attributePrefix + ".args." + fieldCount + ".type", String.valueOf(sqlType));
 
                 final Integer colSize = desc.getColumnSize();
                 final JsonNode fieldNode = rootNode.get(fieldName);
                 if (!fieldNode.isNull()) {
                     String fieldValue = createSqlStringValue(fieldNode, colSize, sqlType);
-                    attributes.put("sql.args." + fieldCount + ".value", fieldValue);
+                    attributes.put(attributePrefix + ".args." + fieldCount + ".value", fieldValue);
                 }
             }
         }
@@ -562,7 +579,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
 
     private String generateUpdate(final JsonNode rootNode, final Map<String, String>
attributes, final String tableName, final String updateKeys,
                                   final TableSchema schema, final boolean translateFieldNames,
final boolean ignoreUnmappedFields, final boolean failUnmappedColumns,
-                                  final boolean warningUnmappedColumns, boolean escapeColumnNames,
boolean quoteTableName) {
+                                  final boolean warningUnmappedColumns, boolean escapeColumnNames,
boolean quoteTableName, final String attributePrefix) {
 
         final Set<String> updateKeyNames;
         if (updateKeys == null) {
@@ -612,7 +629,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
         }
 
         // iterate over all of the elements in the JSON, building the SQL statement by adding
the column names, as well as
-        // adding the column value to a "sql.args.N.value" attribute and the type of a "sql.args.N.type"
attribute add the
+        // adding the column value to a "<sql>.args.N.value" attribute and the type
of a "<sql>.args.N.type" attribute add the
         // columns that we are inserting into
         Iterator<String> fieldNames = rootNode.getFieldNames();
         while (fieldNames.hasNext()) {
@@ -648,14 +665,14 @@ public class ConvertJSONToSQL extends AbstractProcessor {
 
             sqlBuilder.append(" = ?");
             final int sqlType = desc.getDataType();
-            attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType));
+            attributes.put(attributePrefix + ".args." + fieldCount + ".type", String.valueOf(sqlType));
 
             final Integer colSize = desc.getColumnSize();
 
             final JsonNode fieldNode = rootNode.get(fieldName);
             if (!fieldNode.isNull()) {
                 String fieldValue = createSqlStringValue(fieldNode, colSize, sqlType);
-                attributes.put("sql.args." + fieldCount + ".value", fieldValue);
+                attributes.put(attributePrefix + ".args." + fieldCount + ".value", fieldValue);
             }
         }
 
@@ -693,14 +710,14 @@ public class ConvertJSONToSQL extends AbstractProcessor {
             }
             sqlBuilder.append(" = ?");
             final int sqlType = desc.getDataType();
-            attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType));
+            attributes.put(attributePrefix + ".args." + fieldCount + ".type", String.valueOf(sqlType));
 
             final Integer colSize = desc.getColumnSize();
             String fieldValue = rootNode.get(fieldName).asText();
             if (colSize != null && fieldValue.length() > colSize) {
                 fieldValue = fieldValue.substring(0, colSize);
             }
-            attributes.put("sql.args." + fieldCount + ".value", fieldValue);
+            attributes.put(attributePrefix + ".args." + fieldCount + ".value", fieldValue);
         }
 
         return sqlBuilder.toString();
@@ -708,7 +725,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
 
     private String generateDelete(final JsonNode rootNode, final Map<String, String>
attributes, final String tableName,
                                   final TableSchema schema, final boolean translateFieldNames,
final boolean ignoreUnmappedFields, final boolean failUnmappedColumns,
-                                  final boolean warningUnmappedColumns, boolean escapeColumnNames,
boolean quoteTableName) {
+                                  final boolean warningUnmappedColumns, boolean escapeColumnNames,
boolean quoteTableName, final String attributePrefix) {
         final Set<String> normalizedFieldNames = getNormalizedColumnNames(rootNode,
translateFieldNames);
         for (final String requiredColName : schema.getRequiredColumnNames()) {
             final String normalizedColName = normalizeColumnName(requiredColName, translateFieldNames);
@@ -737,7 +754,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
         sqlBuilder.append(" WHERE ");
 
         // iterate over all of the elements in the JSON, building the SQL statement by adding
the column names, as well as
-        // adding the column value to a "sql.args.N.value" attribute and the type of a "sql.args.N.type"
attribute add the
+        // adding the column value to a "<sql>.args.N.value" attribute and the type
of a "<sql>.args.N.type" attribute add the
         // columns that we are inserting into
         final Iterator<String> fieldNames = rootNode.getFieldNames();
         while (fieldNames.hasNext()) {
@@ -763,7 +780,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
                 sqlBuilder.append(" = ?");
 
                 final int sqlType = desc.getDataType();
-                attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType));
+                attributes.put(attributePrefix + ".args." + fieldCount + ".type", String.valueOf(sqlType));
 
                 final Integer colSize = desc.getColumnSize();
                 final JsonNode fieldNode = rootNode.get(fieldName);
@@ -772,7 +789,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
                     if (colSize != null && fieldValue.length() > colSize) {
                         fieldValue = fieldValue.substring(0, colSize);
                     }
-                    attributes.put("sql.args." + fieldCount + ".value", fieldValue);
+                    attributes.put(attributePrefix + ".args." + fieldCount + ".value", fieldValue);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/612675e4/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
index 2332e28..8a03a81 100755
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
@@ -874,6 +874,35 @@ public class TestConvertJSONToSQL {
         out.assertContentEquals("DELETE FROM PERSONS WHERE ID = ? AND NAME = ? AND CODE =
?");
     }
 
+    @Test
+    public void testAttributePrefix() throws InitializationException, ProcessException, SQLException,
IOException {
+        final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
+        runner.setProperty(ConvertJSONToSQL.QUOTED_IDENTIFIERS, "true");
+        runner.setProperty(ConvertJSONToSQL.SQL_PARAM_ATTR_PREFIX, "hiveql");
+
+        runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+        runner.run();
+
+        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(),
"1");
+        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
+        out.assertAttributeEquals("hiveql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("hiveql.args.1.value", "1");
+        out.assertAttributeEquals("hiveql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
+        out.assertAttributeEquals("hiveql.args.2.value", "Mark");
+        out.assertAttributeEquals("hiveql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("hiveql.args.3.value", "48");
+
+        out.assertContentEquals("INSERT INTO PERSONS (\"ID\", \"NAME\", \"CODE\") VALUES
(?, ?, ?)");
+    }
+
     /**
      * Simple implementation only for testing purposes
      */


Mime
View raw message