kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ismael Juma (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (KAFKA-5164) SetSchemaMetadata does not replace the schemas in structs correctly
Date Tue, 25 Jul 2017 11:12:04 GMT

     [ https://issues.apache.org/jira/browse/KAFKA-5164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel

Ismael Juma updated KAFKA-5164:
    Fix Version/s:     (was: 1.0.0)

> SetSchemaMetadata does not replace the schemas in structs correctly
> -------------------------------------------------------------------
>                 Key: KAFKA-5164
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5164
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions:
>            Reporter: Ewen Cheslack-Postava
>            Assignee: Randall Hauch
>             Fix For:
> In SetSchemaMetadataTest we verify that the name and version of the schema in the record
have been replaced:
> https://github.com/apache/kafka/blob/trunk/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java#L62
> However, in the case of Structs, the schema will be attached to both the record and the
Struct itself. So we correctly rebuild the Record:
> https://github.com/apache/kafka/blob/trunk/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java#L77
> https://github.com/apache/kafka/blob/trunk/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java#L104
> https://github.com/apache/kafka/blob/trunk/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java#L119
> But if the key or value are a struct, they will still contain the old schema embedded
in the struct.
> Ultimately this can lead to validations in other code failing (even for very simple changes
like adjusting the name of a schema):
> {code}
> (org.apache.kafka.connect.runtime.WorkerTask:141)
> org.apache.kafka.connect.errors.DataException: Mismatching struct schema
>     at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:471)
>     at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:295)
>     at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:73)
>     at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:196)
>     at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:167)
>     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
>     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
> {code}
> The solution to this is probably to check whether we're dealing with a Struct when we
use a new schema and potentially copy/reallocate it.
> This particular issue would only appear if we don't modify the data, so I think SetSchemaMetadata
is currently the only transformation that would have the issue.

This message was sent by Atlassian JIRA

View raw message