nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pvill...@apache.org
Subject nifi git commit: NIFI-4612: Allow AvroSchemaRegistry to disable name validation
Date Tue, 21 Nov 2017 10:15:20 GMT
Repository: nifi
Updated Branches:
  refs/heads/master ff5325b92 -> 8f2501ffa


NIFI-4612: Allow AvroSchemaRegistry to disable name validation

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2275.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/8f2501ff
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/8f2501ff
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/8f2501ff

Branch: refs/heads/master
Commit: 8f2501ffac68885c2baee4b313ff016454e038a3
Parents: ff5325b
Author: Matthew Burgess <mattyb149@apache.org>
Authored: Thu Nov 16 11:49:02 2017 -0500
Committer: Pierre Villard <pierre.villard.fr@gmail.com>
Committed: Tue Nov 21 11:14:19 2017 +0100

----------------------------------------------------------------------
 .../services/AvroSchemaRegistry.java            | 82 +++++++++++++++++---
 .../services/TestAvroSchemaRegistry.java        | 49 +++++++++++-
 2 files changed, 120 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/8f2501ff/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java
index 169d79d..785d729 100644
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java
+++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java
@@ -17,8 +17,13 @@
 package org.apache.nifi.schemaregistry.services;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -32,8 +37,12 @@ import org.apache.nifi.annotation.lifecycle.OnDisabled;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.avro.AvroTypeUtil;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.schema.access.SchemaField;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
@@ -49,24 +58,73 @@ public class AvroSchemaRegistry extends AbstractControllerService implements
Sch
     private final Map<String, String> schemaNameToSchemaMap;
     private final ConcurrentMap<String, RecordSchema> recordSchemas = new ConcurrentHashMap<>();
 
+    static final PropertyDescriptor VALIDATE_FIELD_NAMES = new PropertyDescriptor.Builder()
+            .name("avro-reg-validated-field-names")
+            .displayName("Validate Field Names")
+            .description("Whether or not to validate the field names in the Avro schema based
on Avro naming rules. If set to true, all field names must be valid Avro names, "
+                    + "which must begin with [A-Za-z_], and subsequently contain only [A-Za-z0-9_].
If set to false, no validation will be performed on the field names.")
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .required(true)
+            .build();
+
+    private List<PropertyDescriptor> propertyDescriptors = new ArrayList<>();
+
     public AvroSchemaRegistry() {
         this.schemaNameToSchemaMap = new HashMap<>();
     }
 
     @Override
+    protected void init(ControllerServiceInitializationContext config) throws InitializationException
{
+        super.init(config);
+        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+        _propertyDescriptors.add(VALIDATE_FIELD_NAMES);
+        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
+    }
+
+    @Override
     public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue,
final String newValue) {
-        if (newValue == null) {
-            recordSchemas.remove(descriptor.getName());
-        } else {
+        if(descriptor.isDynamic()) {
+            // Dynamic property = schema, validate it
+            if (newValue == null) {
+                recordSchemas.remove(descriptor.getName());
+            } else {
+                try {
+                    // Use a non-strict parser here, a strict parse can be done (if specified)
in customValidate().
+                    final Schema avroSchema = new Schema.Parser().setValidate(false).parse(newValue);
+                    final SchemaIdentifier schemaId = SchemaIdentifier.ofName(descriptor.getName());
+                    final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema,
newValue, schemaId);
+                    recordSchemas.put(descriptor.getName(), recordSchema);
+                } catch (final Exception e) {
+                    // not a problem - the service won't be valid and the validation message
will indicate what is wrong.
+                }
+            }
+        }
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext)
{
+        Set<ValidationResult> results = new HashSet<>();
+        boolean strict = validationContext.getProperty(VALIDATE_FIELD_NAMES).asBoolean();
+
+        // Iterate over dynamic properties, validating the schemas, and adding results
+        validationContext.getProperties().entrySet().stream().filter(entry -> entry.getKey().isDynamic()).forEach(entry
-> {
+            String subject = entry.getKey().getDisplayName();
+            String input = entry.getValue();
+
             try {
-                final Schema avroSchema = new Schema.Parser().parse(newValue);
-                final SchemaIdentifier schemaId = SchemaIdentifier.ofName(descriptor.getName());
-                final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema, newValue,
schemaId);
-                recordSchemas.put(descriptor.getName(), recordSchema);
+                final Schema avroSchema = new Schema.Parser().setValidate(strict).parse(input);
+                AvroTypeUtil.createSchema(avroSchema, input, SchemaIdentifier.EMPTY);
             } catch (final Exception e) {
-                // not a problem - the service won't be valid and the validation message
will indicate what is wrong.
+                results.add(new ValidationResult.Builder()
+                        .input(input)
+                        .subject(subject)
+                        .valid(false)
+                        .explanation("Not a valid Avro Schema: " + e.getMessage())
+                        .build());
             }
-        }
+        });
+        return results;
     }
 
     @Override
@@ -111,13 +169,17 @@ public class AvroSchemaRegistry extends AbstractControllerService implements
Sch
             .collect(Collectors.toMap(propEntry -> propEntry.getKey().getName(), propEntry
-> propEntry.getValue())));
     }
 
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
 
     @Override
     protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName)
{
         return new PropertyDescriptor.Builder()
             .name(propertyDescriptorName)
             .required(false)
-            .addValidator(new AvroSchemaValidator())
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .dynamic(true)
             .expressionLanguageSupported(true)
             .build();

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f2501ff/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java
index 9121f04..67a0959 100644
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java
+++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java
@@ -17,13 +17,18 @@
 package org.apache.nifi.schemaregistry.services;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.junit.Assert;
@@ -54,7 +59,6 @@ public class TestAvroSchemaRegistry {
         when(configContext.getProperties()).thenReturn(properties);
         AvroSchemaRegistry delegate = new AvroSchemaRegistry();
         delegate.enable(configContext);
-
         String locatedSchemaText = delegate.retrieveSchemaText(schemaName);
         assertEquals(fooSchemaText, locatedSchemaText);
         try {
@@ -65,4 +69,47 @@ public class TestAvroSchemaRegistry {
 
         delegate.close();
     }
+
+    @Test
+    public void validateStrictAndNonStrictSchemaRegistrationFromDynamicProperties() throws
Exception {
+        String schemaName = "fooSchema";
+        ConfigurationContext configContext = mock(ConfigurationContext.class);
+        Map<PropertyDescriptor, String> properties = new HashMap<>();
+        PropertyDescriptor fooSchema = new PropertyDescriptor.Builder()
+                .name(schemaName)
+                .dynamic(true)
+                .build();
+        // NOTE: name of record and name of first field are not Avro-compliant, verified
below
+        String fooSchemaText = "{\"namespace\": \"example.avro\", " + "\"type\": \"record\",
" + "\"name\": \"$User\", "
+                + "\"fields\": [ " + "{\"name\": \"@name\", \"type\": [\"string\", \"null\"]},
"
+                + "{\"name\": \"favorite_number\",  \"type\": [\"int\", \"null\"]}, "
+                + "{\"name\": \"foo\",  \"type\": [\"int\", \"null\"]}, "
+                + "{\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]} " + "]"
+ "}";
+        PropertyDescriptor barSchema = new PropertyDescriptor.Builder()
+                .name("barSchema")
+                .dynamic(false)
+                .build();
+        properties.put(fooSchema, fooSchemaText);
+        properties.put(barSchema, "");
+        AvroSchemaRegistry delegate = new AvroSchemaRegistry();
+        delegate.getSupportedPropertyDescriptors().forEach(prop -> properties.put(prop,
prop.getDisplayName()));
+        when(configContext.getProperties()).thenReturn(properties);
+
+        ValidationContext validationContext = mock(ValidationContext.class);
+        when(validationContext.getProperties()).thenReturn(properties);
+        PropertyValue propertyValue = mock(PropertyValue.class);
+        when(validationContext.getProperty(AvroSchemaRegistry.VALIDATE_FIELD_NAMES)).thenReturn(propertyValue);
+
+        // Strict parsing
+        when(propertyValue.asBoolean()).thenReturn(true);
+        Collection<ValidationResult> results = delegate.customValidate(validationContext);
+        assertTrue(results.stream().anyMatch(result -> !result.isValid()));
+
+        // Non-strict parsing
+        when(propertyValue.asBoolean()).thenReturn(false);
+        results = delegate.customValidate(validationContext);
+        results.forEach(result -> assertTrue(result.isValid()));
+
+        delegate.close();
+    }
 }


Mime
View raw message