kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Randall Hauch (JIRA)" <j...@apache.org>
Subject [jira] [Resolved] (KAFKA-6133) NullPointerException in S3 Connector when using rotate.interval.ms
Date Fri, 27 Oct 2017 14:36:00 GMT

     [ https://issues.apache.org/jira/browse/KAFKA-6133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Randall Hauch resolved KAFKA-6133.
----------------------------------
    Resolution: Invalid

Closing this as INVALID, since this is not Apache Kafka code. These connector-specific issues
should be logged in the connector's project, which you've done [here|https://github.com/confluentinc/kafka-connect-storage-cloud/issues/109].

> NullPointerException in S3 Connector when using rotate.interval.ms
> ------------------------------------------------------------------
>
>                 Key: KAFKA-6133
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6133
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 0.11.0.0
>            Reporter: Elizabeth Bennett
>
> I just tried out the new rotate.interval.ms feature in the S3 connector to do time based
flushing. I am getting this NPE on every event:
> {code}[2017-10-20 23:21:35,233] ERROR Task foo-to-S3-0 threw an uncaught and unrecoverable
exception (org.apache.kafka.connect.runtime.WorkerSinkTask)
> java.lang.NullPointerException
>         at io.confluent.connect.s3.TopicPartitionWriter.rotateOnTime(TopicPartitionWriter.java:288)
>         at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:234)
>         at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:180)
>         at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:464)
>         at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265)
>         at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)
>         at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150)
>         at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
>         at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
>         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:748)
> [2017-10-20 23:21:35,233] ERROR Task is being killed and will not recover until manually
restarted (org.apache.kafka.connect.runtime.WorkerSinkTask)
> [2017-10-20 23:21:35,233] ERROR Task foo-to-S3-0 threw an uncaught and unrecoverable
exception (org.apache.kafka.connect.runtime.WorkerTask)
> org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable
exception.
>         at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:484)
>         at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265)
>         at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)
>         at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150)
>         at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
>         at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
>         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:748){code}
> I dug into the S3 connect code a bit and it looks like the {{rotate.interval.ms}} feature
only works if you are using the TimeBasedPartitioner. It will get the TimestampExtractor class
from the TimeBasedPartitioner to determine the timestamp of the event, and will use this for
the time based flushing.
> I'm using a custom partitioner, but I'd still really like to use the {{rotate.interval.ms}}
feature, using wall clock time to determine the flushing behavior.
> I'd be willing to work on fixing this issue, but I want to confirm it is actually bug,
and not that it was specifically designed to only work with the TimeBasedPartitioner. Even
if it is the latter, it should probably not crash with an NPE.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message