kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6511; Corrected connect list/map parsing logic (#4516)
Date Tue, 13 Feb 2018 19:44:16 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a68a8b6  KAFKA-6511; Corrected connect list/map parsing logic (#4516)
a68a8b6 is described below

commit a68a8b6b6cc47d488ada99e4bfe87cc7467621bb
Author: Randall Hauch <rhauch@gmail.com>
AuthorDate: Tue Feb 13 13:44:12 2018 -0600

    KAFKA-6511; Corrected connect list/map parsing logic (#4516)
    
    Corrected the parsing of invalid list values. A list can only be parsed if it contains
elements that have a common type, and a map can only be parsed if it contains keys with a
common type and values with a common type.
    
    Reviewers: Arjun Satish <arjun@confluent.io>, Magesh Nandakumar <magesh.n.kumar@gmail.com>,
Konstantine Karantasis <konstantine@confluent.io>, Jason Gustafson <jason@confluent.io>
---
 .../java/org/apache/kafka/connect/data/Values.java |  29 ++++--
 .../org/apache/kafka/connect/data/ValuesTest.java  | 112 +++++++++++++++++++++
 2 files changed, 132 insertions(+), 9 deletions(-)

diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
index 41040c7..05248ef 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
@@ -749,10 +749,16 @@ public class Values {
                 Schema elementSchema = null;
                 while (parser.hasNext()) {
                     if (parser.canConsume(ARRAY_END_DELIMITER)) {
-                        Schema listSchema = elementSchema == null ? null : SchemaBuilder.array(elementSchema).schema();
+                        Schema listSchema = null;
+                        if (elementSchema != null) {
+                            listSchema = SchemaBuilder.array(elementSchema).schema();
+                        }
                         result = alignListEntriesWithSchema(listSchema, result);
                         return new SchemaAndValue(listSchema, result);
                     }
+                    if (parser.canConsume(COMMA_DELIMITER)) {
+                        throw new DataException("Unable to parse an empty array element:
" + parser.original());
+                    }
                     SchemaAndValue element = parse(parser, true);
                     elementSchema = commonSchemaFor(elementSchema, element);
                     result.add(element.value());
@@ -760,9 +766,9 @@ public class Values {
                 }
                 // Missing either a comma or an end delimiter
                 if (COMMA_DELIMITER.equals(parser.previous())) {
-                    throw new DataException("Malformed array: missing element after ','");
+                    throw new DataException("Array is missing element after ',': " + parser.original());
                 }
-                throw new DataException("Malformed array: missing terminating ']'");
+                throw new DataException("Array is missing terminating ']': " + parser.original());
             }
 
             if (parser.canConsume(MAP_BEGIN_DELIMITER)) {
@@ -771,17 +777,22 @@ public class Values {
                 Schema valueSchema = null;
                 while (parser.hasNext()) {
                     if (parser.canConsume(MAP_END_DELIMITER)) {
-                        Schema mapSchema =
-                                keySchema == null || valueSchema == null ? null : SchemaBuilder.map(keySchema,
valueSchema).schema();
+                        Schema mapSchema = null;
+                        if (keySchema != null && valueSchema != null) {
+                            mapSchema = SchemaBuilder.map(keySchema, valueSchema).schema();
+                        }
                         result = alignMapKeysAndValuesWithSchema(mapSchema, result);
                         return new SchemaAndValue(mapSchema, result);
                     }
+                    if (parser.canConsume(COMMA_DELIMITER)) {
+                        throw new DataException("Unable to parse a map entry has no key or
value: " + parser.original());
+                    }
                     SchemaAndValue key = parse(parser, true);
                     if (key == null || key.value() == null) {
-                        throw new DataException("Malformed map entry: null key");
+                        throw new DataException("Map entry may not have a null key: " + parser.original());
                     }
                     if (!parser.canConsume(ENTRY_DELIMITER)) {
-                        throw new DataException("Malformed map entry: missing '='");
+                        throw new DataException("Map entry is missing '=': " + parser.original());
                     }
                     SchemaAndValue value = parse(parser, true);
                     Object entryValue = value != null ? value.value() : null;
@@ -792,9 +803,9 @@ public class Values {
                 }
                 // Missing either a comma or an end delimiter
                 if (COMMA_DELIMITER.equals(parser.previous())) {
-                    throw new DataException("Malformed map: missing element after ','");
+                    throw new DataException("Map is missing element after ',': " + parser.original());
                 }
-                throw new DataException("Malformed array: missing terminating ']'");
+                throw new DataException("Map is missing terminating ']': " + parser.original());
             }
         } catch (DataException e) {
             LOG.debug("Unable to parse the value as a map; reverting to string", e);
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
index 70835c8..c2caf08 100644
--- a/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
+++ b/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
@@ -16,7 +16,9 @@
  */
 package org.apache.kafka.connect.data;
 
+import org.apache.kafka.connect.data.Schema.Type;
 import org.apache.kafka.connect.data.Values.Parser;
+import org.apache.kafka.connect.errors.DataException;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -159,6 +161,116 @@ public class ValuesTest {
         assertRoundTrip(INT_LIST_SCHEMA, INT_LIST_SCHEMA, INT_LIST);
     }
 
+    /**
+     * The parsed array has byte values and one int value, so we should return list with
single unified type of integers.
+     */
+    @Test
+    public void shouldConvertStringOfListWithOnlyNumericElementTypesIntoListOfLargestNumericType()
{
+        int thirdValue = Short.MAX_VALUE + 1;
+        List<?> list = Values.convertToList(Schema.STRING_SCHEMA, "[1, 2, " + thirdValue
+ "]");
+        assertEquals(3, list.size());
+        assertEquals(1, ((Number) list.get(0)).intValue());
+        assertEquals(2, ((Number) list.get(1)).intValue());
+        assertEquals(thirdValue, ((Number) list.get(2)).intValue());
+    }
+
+    /**
+     * The parsed array has byte values and one int value, so we should return list with
single unified type of integers.
+     */
+    @Test
+    public void shouldConvertStringOfListWithMixedElementTypesIntoListWithDifferentElementTypes()
{
+        String str = "[1, 2, \"three\"]";
+        List<?> list = Values.convertToList(Schema.STRING_SCHEMA, str);
+        assertEquals(3, list.size());
+        assertEquals(1, ((Number) list.get(0)).intValue());
+        assertEquals(2, ((Number) list.get(1)).intValue());
+        assertEquals("three", list.get(2));
+    }
+
+    /**
+     * We parse into different element types, but cannot infer a common element schema.
+     */
+    @Test
+    public void shouldParseStringListWithMultipleElementTypesAndReturnListWithNoSchema()
{
+        String str = "[1, 2, 3, \"four\"]";
+        SchemaAndValue result = Values.parseString(str);
+        assertNull(result.schema());
+        List<?> list = (List<?>) result.value();
+        assertEquals(4, list.size());
+        assertEquals(1, ((Number) list.get(0)).intValue());
+        assertEquals(2, ((Number) list.get(1)).intValue());
+        assertEquals(3, ((Number) list.get(2)).intValue());
+        assertEquals("four", list.get(3));
+    }
+
+    /**
+     * We can't infer or successfully parse into a different type, so this returns the same
string.
+     */
+    @Test
+    public void shouldParseStringListWithExtraDelimitersAndReturnString() {
+        String str = "[1, 2, 3,,,]";
+        SchemaAndValue result = Values.parseString(str);
+        assertEquals(Type.STRING, result.schema().type());
+        assertEquals(str, result.value());
+    }
+
+    /**
+     * This is technically invalid JSON, and we don't want to simply ignore the blank elements.
+     */
+    @Test(expected = DataException.class)
+    public void shouldFailToConvertToListFromStringWithExtraDelimiters() {
+        Values.convertToList(Schema.STRING_SCHEMA, "[1, 2, 3,,,]");
+    }
+
+    /**
+     * Schema of type ARRAY requires a schema for the values, but Connect has no union or
"any" schema type.
+     * Therefore, we can't represent this.
+     */
+    @Test(expected = DataException.class)
+    public void shouldFailToConvertToListFromStringWithNonCommonElementTypeAndBlankElement()
{
+        Values.convertToList(Schema.STRING_SCHEMA, "[1, 2, 3, \"four\",,,]");
+    }
+
+    /**
+     * This is technically invalid JSON, and we don't want to simply ignore the blank entry.
+     */
+    @Test(expected = DataException.class)
+    public void shouldFailToParseStringOfMapWithIntValuesWithBlankEntry() {
+        Values.convertToList(Schema.STRING_SCHEMA, " { \"foo\" :  1234567890 ,, \"bar\" :
0,  \"baz\" : -987654321 }  ");
+    }
+
+    /**
+     * This is technically invalid JSON, and we don't want to simply ignore the malformed
entry.
+     */
+    @Test(expected = DataException.class)
+    public void shouldFailToParseStringOfMalformedMap() {
+        Values.convertToList(Schema.STRING_SCHEMA, " { \"foo\" :  1234567890 , \"a\", \"bar\"
: 0,  \"baz\" : -987654321 }  ");
+    }
+
+    /**
+     * This is technically invalid JSON, and we don't want to simply ignore the blank entries.
+     */
+    @Test(expected = DataException.class)
+    public void shouldFailToParseStringOfMapWithIntValuesWithOnlyBlankEntries() {
+        Values.convertToList(Schema.STRING_SCHEMA, " { ,,  , , }  ");
+    }
+
+    /**
+     * This is technically invalid JSON, and we don't want to simply ignore the blank entry.
+     */
+    @Test(expected = DataException.class)
+    public void shouldFailToParseStringOfMapWithIntValuesWithBlankEntries() {
+        Values.convertToList(Schema.STRING_SCHEMA, " { \"foo\" :  \"1234567890\" ,, \"bar\"
: \"0\",  \"baz\" : \"boz\" }  ");
+    }
+
+    /**
+     * Schema for Map requires a schema for key and value, but we have no key or value and
Connect has no "any" type
+     */
+    @Test(expected = DataException.class)
+    public void shouldFailToParseStringOfEmptyMap() {
+        Values.convertToList(Schema.STRING_SCHEMA, " { }  ");
+    }
+
     @Test
     public void shouldParseStringsWithoutDelimiters() {
         //assertParsed("");

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.

Mime
View raw message