kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Randall Hauch (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6728) Kafka Connect Header Null Pointer Exception
Date Tue, 03 Apr 2018 04:20:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16423452#comment-16423452

Randall Hauch commented on KAFKA-6728:

Testing with a test source connector reveals the same problem:

[2018-04-02 21:59:28,357] INFO Record: SourceRecord{sourcePartition={my-source=0}, sourceOffset={offset=1}}
ConnectRecord{topic='my-topic', kafkaPartition=null, key=key-1, value=value-1, timestamp=null,
headers=ConnectHeaders(headers=[ConnectHeader(key=my.header, value=MyHeaderValue, schema=Schema{STRING})])}
[2018-04-02 21:59:28,358] INFO MySourceTask.poll() - returning 1 record (com.mycompany.examples.MySourceTask:54)
[2018-04-02 21:59:28,359] INFO WorkerSourceTask{id=my-test-source-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:328)
[2018-04-02 21:59:28,359] INFO WorkerSourceTask{id=my-test-source-0} flushing 0 outstanding
messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:345)
[2018-04-02 21:59:28,360] ERROR WorkerSourceTask{id=my-test-source-0} Task threw an uncaught
and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.convertHeaderFor(WorkerSourceTask.java:296)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:226)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:194)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
[2018-04-02 21:59:28,361] ERROR WorkerSourceTask{id=my-test-source-0} Task is being killed
and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:173)

Adding the following to the connector configuration results in a successful run:


This demonstrates that the problem is not with the message headers (since this test connector
is writing headers correctly using the Connect API), but instead that the NPE results from
the Worker not properly instantiating the HeaderConverter.

Removing the {{header.converter}} property from the connector configuration and temporarily
replacing the {{connect-runtime-1.1.0.jar}} with one built with my aforementioned PR tested
manually using a custom source connector that outputs header values with 8 different combinations
for the {{header.converter}} configuration settings:

#  default value
#  worker configuration has {{header.converter}} explicitly set to the default
#  worker configuration has {{header.converter}} set to a custom{{HeaderConverter}} implementation
in the same plugin
#  worker configuration has {{header.converter}} set to a custom {{HeaderConverter}} implementation
in a _different_ plugin
#  connector configuration has {{header.converter}} explicitly set to the default
#  connector configuration has {{header.converter}} set to a custom {{HeaderConverter}} implementation
in the same plugin
#  connector configuration has {{header.converter}} set to a custom {{HeaderConverter}} implementation
in a _different_ plugin
# worker configuration has {{header.converter}} explicitly set to the default, and the connector
configuration has {{header.converter}} set to a custom {{HeaderConverter}} implementation
in a _different_ plugin

The worker created the correct {{HeaderConverter}} implementation with the correct configuration
in all of these tests.

Finally, the default configuration was used with the aforementioned custom source connector
that generated records with headers, and an S3 connector that consumes the records with headers
(but didn't do anything with them). This test also passed.

> Kafka Connect Header Null Pointer Exception
> -------------------------------------------
>                 Key: KAFKA-6728
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6728
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 1.1.0
>         Environment: Linux Mint
>            Reporter: Philippe Hong
>            Priority: Critical
> I am trying to use the newly released Kafka Connect that supports headers by using the
standalone connector to write to a text file (so in this case I am only using the sink component)
> I am sadly greeted by a NullPointerException :
> {noformat}
> ERROR WorkerSinkTask{id=local-file-sink-0} Task threw an uncaught and unrecoverable exception
> java.lang.NullPointerException
>     at org.apache.kafka.connect.runtime.WorkerSinkTask.convertHeadersFor(WorkerSinkTask.java:501)
>     at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:469)
>     at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
>     at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
>     at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
>     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
>     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> {noformat}
> I launched zookeeper and kafka 1.1.0 locally and sent a ProducerRecord[String, Array[Byte]]
using a KafkaProducer[String, Array[Byte]] with a header that has a key and value.
> I can read the record with a console consumer as well as using a KafkaConsumer (where
in this case I can see the content of the header of the record I sent previously) so no problem
> I only made two changes to the kafka configuration:
>      - I used the StringConverter for the key and the ByteArrayConverter for the value.

>      - I also changed the topic where the sink would connect to.
> If I forgot something please tell me so as it is the first time I am creating an issue
on Jira.

This message was sent by Atlassian JIRA

View raw message