kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Randall Hauch (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6685) Connect deserialization log message should distinguish key from value
Date Mon, 19 Mar 2018 16:07:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16405043#comment-16405043
] 

Randall Hauch commented on KAFKA-6685:
--------------------------------------

The stack trace often looks something like:

{noformat}
[2018-03-19 11:57:37,419] ERROR WorkerSinkTask{id=elasticsearch-0} Task threw an uncaught
and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.apache.kafka.connect.errors.DataException: MYTOPIC
	at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:95)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:467)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema
for id 1
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject
not found.; error code: 40401
	at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:202)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:229)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:296)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:284)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersionFromRegistry(CachedSchemaRegistryClient.java:125)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersion(CachedSchemaRegistryClient.java:236)
	at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:152)
	at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:194)
	at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:120)
	at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:83)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:467)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
{noformat}

We should consider wrapping each of [these lines|https://github.com/apache/kafka/blob/e7ef719a5bc0d1276f0e9482d59b25406fda276b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L467-L468]:

{code:java}
            SchemaAndValue keyAndSchema = keyConverter.toConnectData(msg.topic(), msg.key());
            SchemaAndValue valueAndSchema = valueConverter.toConnectData(msg.topic(), msg.value());
{code}

in a try block and then reporting a more useful error message for each condition.

> Connect deserialization log message should distinguish key from value
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-6685
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6685
>             Project: Kafka
>          Issue Type: Improvement
>          Components: KafkaConnect
>            Reporter: Yeva Byzek
>            Priority: Minor
>
> Connect was configured for Avro key and value but data had String key and Avro value.
The resulting error message was misleading because it didn't distinguish key from value, and
so I was chasing problems with the value instead of the key.
> tl;dr Connect should at least tell you whether the problem is with deserializing the
key or value of a record
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message