kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Hanlin Liu (JIRA)" <j...@apache.org>
Subject [jira] [Created] (KAFKA-6595) Kafka connect commit offset incorrectly.
Date Tue, 27 Feb 2018 07:53:00 GMT
Hanlin Liu created KAFKA-6595:
---------------------------------

             Summary: Kafka connect commit offset incorrectly.
                 Key: KAFKA-6595
                 URL: https://issues.apache.org/jira/browse/KAFKA-6595
             Project: Kafka
          Issue Type: Bug
          Components: KafkaConnect
    Affects Versions: 0.10.2.0
            Reporter: Hanlin Liu


SourceTaskOffsetComitter calls commitOffset() and waits for all incomplete records to be sent.
While the task is stopped, commitOffset() is called again by the final block in WorkerSourceTask.execute(),
it will throw {{Invalid call to OffsetStorageWriter flush() while already flushing, the framework
should not allow this}} exception. This will trigger closing Producer without waiting the
flush timeout.

After 30 seconds, all incomplete records has been forcefully aborted. If the {{offset.flush.timeout.ms}} is
configured larger than 30 seconds, WorkerSourceTask will consider those aborted records as
sent within flush timeout, which results in incorrectly flushing the source offset.

 
{code:java}
// code placeholder

2018-02-27 02:59:33,134 INFO  [] Stopping connector dp-sqlserver-connector-dptask_455  
[pool-3-thread-6][Worker.java:254]

2018-02-27 02:59:33,134 INFO  [] Stopped connector dp-sqlserver-connector-dptask_455   [pool-3-thread-6][Worker.java:264]



2018-02-27 02:59:34,121 ERROR [] Invalid call to OffsetStorageWriter flush() while already
flushing, the framework should not allow this   [pool-1-thread-13][OffsetStorageWriter.java:110]

2018-02-27 02:59:34,121 ERROR [] Task dp-sqlserver-connector-dptask_455-0 threw an uncaught
and unrecoverable exception   [pool-1-thread-13][WorkerTask.java:141]

org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is already flushing

        at org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:112)

        at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:294)

        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:177)

        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)

        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)

        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

        at java.util.concurrent.FutureTask.run(FutureTask.java:266)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

        at java.lang.Thread.run(Thread.java:745)

2018-02-27 03:00:00,734 ERROR [] Graceful stop of task dp-sqlserver-connector-dptask_455-0
failed.   [pool-3-thread-1][Worker.java:405]

2018-02-27 03:00:04,126 INFO  [] Proceeding to force close the producer since pending requests
could not be completed within timeout 30 ms.   [pool-1-thread-13][KafkaProducer.java:713]

2018-02-27 03:00:04,127 ERROR [] dp-sqlserver-connector-dptask_455-0 failed to send record
to dptask_455.JF_TEST_11.jf_test_tab_8: {}   [kafka-producer-network-thread | producer-31][WorkerSourceTask.java:228]

java.lang.IllegalStateException: Producer is closed forcefully.

        at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:522)

        at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:502)

        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:147)

        at java.lang.Thread.run(Thread.java:745)

2018-02-27 03:00:09,920 INFO  [] Finished WorkerSourceTask{id=dp-sqlserver-connector-dptask_455-0}
commitOffsets successfully in 47088 ms   [pool-4-thread-1][WorkerSourceTask.java:371]
{code}
 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message