kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-4183) Logical converters in JsonConverter don't properly handle null values
Date Sun, 25 Feb 2018 21:24:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-4183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16376249#comment-16376249
] 

ASF GitHub Bot commented on KAFKA-4183:
---------------------------------------

hachikuji closed pull request #1871: KAFKA-4183 Corrected Kafka Connect's JSON Converter to
properly convert from null to logical values
URL: https://github.com/apache/kafka/pull/1871
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
index d9a685953d2..b35b24aeec3 100644
--- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
+++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
@@ -211,6 +211,7 @@ public Object convert(Schema schema, JsonNode value) {
         TO_CONNECT_LOGICAL_CONVERTERS.put(Decimal.LOGICAL_NAME, new LogicalTypeConverter()
{
             @Override
             public Object convert(Schema schema, Object value) {
+                if (value == null) return checkOptionalAndDefault(schema);
                 if (!(value instanceof byte[]))
                     throw new DataException("Invalid type for Decimal, underlying representation
should be bytes but was " + value.getClass());
                 return Decimal.toLogical(schema, (byte[]) value);
@@ -220,6 +221,7 @@ public Object convert(Schema schema, Object value) {
         TO_CONNECT_LOGICAL_CONVERTERS.put(Date.LOGICAL_NAME, new LogicalTypeConverter() {
             @Override
             public Object convert(Schema schema, Object value) {
+                if (value == null) return checkOptionalAndDefault(schema);
                 if (!(value instanceof Integer))
                     throw new DataException("Invalid type for Date, underlying representation
should be int32 but was " + value.getClass());
                 return Date.toLogical(schema, (int) value);
@@ -229,6 +231,7 @@ public Object convert(Schema schema, Object value) {
         TO_CONNECT_LOGICAL_CONVERTERS.put(Time.LOGICAL_NAME, new LogicalTypeConverter() {
             @Override
             public Object convert(Schema schema, Object value) {
+                if (value == null) return checkOptionalAndDefault(schema);
                 if (!(value instanceof Integer))
                     throw new DataException("Invalid type for Time, underlying representation
should be int32 but was " + value.getClass());
                 return Time.toLogical(schema, (int) value);
@@ -238,6 +241,7 @@ public Object convert(Schema schema, Object value) {
         TO_CONNECT_LOGICAL_CONVERTERS.put(Timestamp.LOGICAL_NAME, new LogicalTypeConverter()
{
             @Override
             public Object convert(Schema schema, Object value) {
+                if (value == null) return checkOptionalAndDefault(schema);
                 if (!(value instanceof Long))
                     throw new DataException("Invalid type for Timestamp, underlying representation
should be int64 but was " + value.getClass());
                 return Timestamp.toLogical(schema, (long) value);
diff --git a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
index c9232853067..7700f18c6dc 100644
--- a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
+++ b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
@@ -54,6 +54,7 @@
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -216,6 +217,16 @@ public void decimalToConnect() {
         assertEquals(reference, converted);
     }
 
+    @Test
+    public void decimalToConnectOptional() {
+        Schema schema = Decimal.builder(2).optional().schema();
+        // Payload is base64 encoded byte[]{0, -100}, which is the two's complement encoding
of 156.
+        String msg = "{ \"schema\": { \"type\": \"bytes\", \"name\": \"org.apache.kafka.connect.data.Decimal\",
\"version\": 1, \"optional\": true, \"parameters\": { \"scale\": \"2\" } }, \"payload\": null
}";
+        SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, msg.getBytes());
+        assertEquals(schema, schemaAndValue.schema());
+        assertNull(schemaAndValue.value());
+    }
+
     @Test
     public void dateToConnect() {
         Schema schema = Date.SCHEMA;
@@ -230,6 +241,15 @@ public void dateToConnect() {
         assertEquals(reference, converted);
     }
 
+    @Test
+    public void dateToConnectOptional() {
+        Schema schema = Date.builder().optional().schema();
+        String msg = "{ \"schema\": { \"type\": \"int32\", \"name\": \"org.apache.kafka.connect.data.Date\",
\"version\": 1, \"optional\": true }, \"payload\": null }";
+        SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, msg.getBytes());
+        assertEquals(schema, schemaAndValue.schema());
+        assertNull(schemaAndValue.value());
+    }
+
     @Test
     public void timeToConnect() {
         Schema schema = Time.SCHEMA;
@@ -244,6 +264,15 @@ public void timeToConnect() {
         assertEquals(reference, converted);
     }
 
+    @Test
+    public void timeToConnectOptional() {
+        Schema schema = Time.builder().optional().schema();
+        String msg = "{ \"schema\": { \"type\": \"int32\", \"name\": \"org.apache.kafka.connect.data.Time\",
\"version\": 1, \"optional\": true }, \"payload\": null }";
+        SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, msg.getBytes());
+        assertEquals(schema, schemaAndValue.schema());
+        assertNull(schemaAndValue.value());
+    }
+
     @Test
     public void timestampToConnect() {
         Schema schema = Timestamp.SCHEMA;
@@ -259,6 +288,15 @@ public void timestampToConnect() {
         assertEquals(reference, converted);
     }
 
+    @Test
+    public void timestampToConnectOptional() {
+        Schema schema = Timestamp.builder().optional().schema();
+        String msg = "{ \"schema\": { \"type\": \"int64\", \"name\": \"org.apache.kafka.connect.data.Timestamp\",
\"version\": 1, \"optional\": true }, \"payload\": null }";
+        SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, msg.getBytes());
+        assertEquals(schema, schemaAndValue.schema());
+        assertNull(schemaAndValue.value());
+    }
+
     // Schema metadata
 
     @Test


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Logical converters in JsonConverter don't properly handle null values
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-4183
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4183
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 0.10.0.1
>            Reporter: Randall Hauch
>            Assignee: Shikhar Bhushan
>            Priority: Major
>             Fix For: 0.10.1.0
>
>
> The {{JsonConverter.TO_CONNECT_LOGICAL_CONVERTERS}} map contains {{LogicalTypeConverter}}
implementations to convert from the raw value into the corresponding logical type value, and
they are used during deserialization of message keys and/or values. However, these implementations
do not handle the case when the input raw value is null, which can happen when a key or value
has a schema that is or contains a field that is _optional_.
> Consider a Kafka Connect schema of type STRUCT that contains a field "date" with an optional
schema of type {{org.apache.kafka.connect.data.Date}}. When the key or value with this schema
contains a null "date" field and is serialized, the logical serializer properly will serialize
the null value. However, upon _deserialization_, the {{JsonConverter.TO_CONNECT_LOGICAL_CONVERTERS}}
are used to convert the literal value (which is null) to a logical value. All of the {{JsonConverter.TO_CONNECT_LOGICAL_CONVERTERS}}
implementations will throw a NullPointerException when the input value is null. 
> For example:
> {code:java}
> java.lang.NullPointerException
> 	at org.apache.kafka.connect.json.JsonConverter$14.convert(JsonConverter.java:224)
> 	at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:731)
> 	at org.apache.kafka.connect.json.JsonConverter.access$100(JsonConverter.java:53)
> 	at org.apache.kafka.connect.json.JsonConverter$12.convert(JsonConverter.java:200)
> 	at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:727)
> 	at org.apache.kafka.connect.json.JsonConverter.jsonToConnect(JsonConverter.java:354)
> 	at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:343)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message