kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-5996; JsonConverter generates Mismatching schema DataException (#4523)
Date Wed, 14 Feb 2018 16:46:26 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d902b6b  KAFKA-5996; JsonConverter generates Mismatching schema DataException (#4523)
d902b6b is described below

commit d902b6b4b6bcf1cc1c488f6a975db8beed5689a6
Author: ConcurrencyPractitioner <yohan.richard.yu@gmail.com>
AuthorDate: Wed Feb 14 08:46:22 2018 -0800

    KAFKA-5996; JsonConverter generates Mismatching schema DataException (#4523)
    
    JsonConverter should use object equality rather than reference equality in `convertToJson`.
    
    Reviewers: Bartlomiej Tartanus <bartektartanus@gmail.com>, Randall Hauch <rhauch@gmail.com>,
Jason Gustafson <jason@confluent.io>
---
 .../org/apache/kafka/connect/json/JsonConverter.java |  2 +-
 .../apache/kafka/connect/json/JsonConverterTest.java | 20 +++++++++++++++++++-
 2 files changed, 20 insertions(+), 2 deletions(-)

diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
index 32ded44..c1322b1 100644
--- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
+++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
@@ -673,7 +673,7 @@ public class JsonConverter implements Converter, HeaderConverter {
                 }
                 case STRUCT: {
                     Struct struct = (Struct) value;
-                    if (struct.schema() != schema)
+                    if (!struct.schema().equals(schema))
                         throw new DataException("Mismatching schema.");
                     ObjectNode obj = JsonNodeFactory.instance.objectNode();
                     for (Field field : schema.fields()) {
diff --git a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
index 0a71044..7686fdb 100644
--- a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
+++ b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
@@ -563,6 +563,20 @@ public class JsonConverterTest {
                 converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
     }
 
+    @Test
+    public void structSchemaIdentical() {
+        Schema schema = SchemaBuilder.struct().field("field1", Schema.BOOLEAN_SCHEMA)
+                                              .field("field2", Schema.STRING_SCHEMA)
+                                              .field("field3", Schema.STRING_SCHEMA)
+                                              .field("field4", Schema.BOOLEAN_SCHEMA).build();
+        Schema inputSchema = SchemaBuilder.struct().field("field1", Schema.BOOLEAN_SCHEMA)
+                                                   .field("field2", Schema.STRING_SCHEMA)
+                                                   .field("field3", Schema.STRING_SCHEMA)
+                                                   .field("field4", Schema.BOOLEAN_SCHEMA).build();
+        Struct input = new Struct(inputSchema).put("field1", true).put("field2", "string2").put("field3",
"string3").put("field4", false);
+        assertStructSchemaEqual(schema, input);
+    }
+
 
     @Test
     public void decimalToJson() throws IOException {
@@ -735,7 +749,6 @@ public class JsonConverterTest {
 
         JsonConverter rc = new JsonConverter();
         rc.configure(workerProps, false);
-
     }
 
 
@@ -791,4 +804,9 @@ public class JsonConverterTest {
         assertTrue(env.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isNull());
         assertTrue(env.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
     }
+    
+    private void assertStructSchemaEqual(Schema schema, Struct struct) {
+        converter.fromConnectData(TOPIC, schema, struct);
+        assertEquals(schema, struct.schema());
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.

Mime
View raw message