flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hironori Ogibayashi <ogibaya...@gmail.com>
Subject Re: Checkpoint takes long with FlinkKafkaConsumer
Date Tue, 05 Jul 2016 07:24:09 GMT
Hi,

Sorry for my late response.
Actually, I received no response in Kafka mailing list and still
cannot find the root cause.
But when I use FlinkKafkaConsumer082, I do not encounter this issue,
so I will use FlinkKafkaConsumer082.

Thanks
Hironori

2016-06-17 2:59 GMT+09:00 Ufuk Celebi <uce@apache.org>:
> Thanks :)
>
> On Thu, Jun 16, 2016 at 3:21 PM, Hironori Ogibayashi
> <ogibayashi@gmail.com> wrote:
>> Ufuk,
>>
>> Yes, of course. I will be sure to update when I got some more information.
>>
>> Hironori
>>
>> 2016-06-16 1:56 GMT+09:00 Ufuk Celebi <uce@apache.org>:
>>> Hey Hironori,
>>>
>>> thanks for reporting this. Could you please update this thread when
>>> you have more information from the Kafka list?
>>>
>>> – Ufuk
>>>
>>>
>>> On Wed, Jun 15, 2016 at 2:48 PM, Hironori Ogibayashi
>>> <ogibayashi@gmail.com> wrote:
>>>> Kostas,
>>>>
>>>> Thank you for your advise. I have posted my question to the Kafka mailing
list.
>>>> I think Kafka brokers are fine because no errors on producer side with
>>>> 15,000 msg/sec and
>>>> from OS metrics, all of my brokers receives almost the same amount of
>>>> network traffic.
>>>>
>>>> Thanks,
>>>> Hironori
>>>>
>>>>
>>>>
>>>>
>>>> 2016-06-14 22:40 GMT+09:00 Kostas Kloudas <k.kloudas@data-artisans.com>:
>>>>> Hello Hironori,
>>>>>
>>>>> The logs just show that you get stuck in the Kafka consumer polling loop,
>>>>> which does not allow the consumer lock to be released. Thus the Flink
>>>>> part of the consumer is never actually called.
>>>>>
>>>>> To my understanding this does not seem to be a Flink issue.
>>>>> Or at least this is not shown from the logs.
>>>>>
>>>>> From googling a bit, I found this:
>>>>>
>>>>> http://stackoverflow.com/questions/35636739/kafka-consumer-marking-the-coordinator-2147483647-dead
>>>>>
>>>>> which relates the problem to network issues.
>>>>>
>>>>> Have you tried posting the problem also to the Kafka mailing list?
>>>>> Can it be that the kafka broker fails and tries to reconnect but does
not
>>>>> make it?
>>>>>
>>>>> Kostas
>>>>>
>>>>> On Jun 14, 2016, at 2:59 PM, Hironori Ogibayashi <ogibayashi@gmail.com>
>>>>> wrote:
>>>>>
>>>>> Kostas,
>>>>>
>>>>> I have attached a log file from one of the taskManager. (The same host
>>>>> I executed jstack)
>>>>> I noticed that there are lots of "Marking the coordinator 2147482645
>>>>> dead" message in the log.
>>>>> MyContinuousProcessingTimeTriggerGlobal in the log is my custom
>>>>> trigger which is based on
>>>>> ContinuousProcessingTimeTrigger but clean up windows when it received
>>>>> specific log records.
>>>>>
>>>>> Thanks,
>>>>> Hironori
>>>>>
>>>>> 2016-06-14 21:23 GMT+09:00 Kostas Kloudas <k.kloudas@data-artisans.com>:
>>>>>
>>>>> Hi Hironori,
>>>>>
>>>>> Could you also provide the logs of the taskManager?
>>>>>
>>>>> As you described, it seems that the consumer is stuck in the polling
loop,
>>>>> although Flink polls with
>>>>> a timeout. This would normally mean that periodically it should release
the
>>>>> lock for the checkpoints to go through.
>>>>>
>>>>> The logs of the task manager can help at clarifying why this does not
>>>>> happen.
>>>>>
>>>>> Thanks,
>>>>> Kostas
>>>>>
>>>>> On Jun 14, 2016, at 12:48 PM, Hironori Ogibayashi <ogibayashi@gmail.com>
>>>>> wrote:
>>>>>
>>>>> Kostas,
>>>>>
>>>>> Thank you for your response.
>>>>> Yes, I am using latest Flink, which is 1.0.3.
>>>>>
>>>>> Thanks,
>>>>> Hironori
>>>>>
>>>>> 2016-06-14 19:02 GMT+09:00 Kostas Kloudas <k.kloudas@data-artisans.com>:
>>>>>
>>>>> Hello Hironori,
>>>>>
>>>>> Are you using the latest Flink version?
>>>>> There were some changes in the FlinkConsumer in the latest releases.
>>>>>
>>>>> Thanks,
>>>>> Kostas
>>>>>
>>>>> On Jun 14, 2016, at 11:52 AM, Hironori Ogibayashi <ogibayashi@gmail.com>
>>>>> wrote:
>>>>>
>>>>> Hello,
>>>>>
>>>>> I am running Flink job which reads topics from Kafka and write results
>>>>> to Redis. I use FsStatebackend with HDFS.
>>>>>
>>>>> I noticed that taking checkpoint takes serveral minutes and sometimes
>>>>> expires.
>>>>> ---
>>>>> 2016-06-14 17:25:40,734 INFO
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>>> Completed checkpoint 1456 (in 257956 ms)
>>>>> 2016-06-14 17:25:40,735 INFO
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>>> Triggering checkpoint 1457 @ 1465892740734
>>>>> 2016-06-14 17:35:40,735 INFO
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>>> Checkpoint 1457 expired before completing.
>>>>> 2016-06-14 17:35:40,736 INFO
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>>> Triggering checkpoint 1458 @ 1465893340735
>>>>> 2016-06-14 17:45:40,736 INFO
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>>> Checkpoint 1458 expired before completing.
>>>>> 2016-06-14 17:45:40,737 INFO
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>>> Triggering checkpoint 1459 @ 1465893940736
>>>>> 2016-06-14 17:55:40,738 INFO
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>>> Checkpoint 1459 expired before completing.
>>>>> 2016-06-14 17:55:40,739 INFO
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>>> Triggering checkpoint 1460 @ 1465894540738
>>>>> ---
>>>>>
>>>>> According to WebUI, checkpoint size is just 1MB. Why checkpointing
>>>>> takes so long?
>>>>>
>>>>> I took jstack during checkpointing. It looks that checkpointing thread
>>>>> is blocked in commitOffsets.
>>>>>
>>>>> ----
>>>>> "Async calls on Source: Custom Source -> Flat Map (2/3)" daemon
>>>>> prio=10 tid=0x00007f2b14010800 nid=0x1b89a waiting for monitor entry
>>>>> [0x00007f2b3ddfc000]
>>>>> java.lang.Thread.State: BLOCKED (on object monitor)
>>>>>      at
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.commitOffsets(FlinkKafkaConsumer09.java:392)
>>>>>      - waiting to lock <0x0000000659111b58> (a
>>>>> org.apache.kafka.clients.consumer.KafkaConsumer)
>>>>>      at
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:169)
>>>>>      at
>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:179)
>>>>>      at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:596)
>>>>>      - locked <0x0000000659111cc8> (a java.lang.Object)
>>>>>      at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:945)
>>>>>      at
>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>>>>      at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>>>>>      at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>      at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>      at java.lang.Thread.run(Thread.java:745)
>>>>> ---
>>>>>
>>>>> Blocker is this.
>>>>>
>>>>> ---
>>>>> "Thread-9" daemon prio=10 tid=0x00007f2b2440d000 nid=0x1b838 runnable
>>>>> [0x00007f2b3dbfa000]
>>>>> java.lang.Thread.State: RUNNABLE
>>>>>      at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>>>>>      at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>>>>>      at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
>>>>>      at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
>>>>>      - locked <0x0000000659457dc8> (a sun.nio.ch.Util$2)
>>>>>      - locked <0x0000000659457db8> (a java.util.Collections$UnmodifiableSet)
>>>>>      - locked <0x0000000659457108> (a sun.nio.ch.EPollSelectorImpl)
>>>>>      at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
>>>>>      at org.apache.kafka.common.network.Selector.select(Selector.java:425)
>>>>>      at org.apache.kafka.common.network.Selector.poll(Selector.java:254)
>>>>>      at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
>>>>>      at
>>>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
>>>>>      at
>>>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
>>>>>      at
>>>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
>>>>>      at
>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
>>>>>      at
>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
>>>>>      at
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:449)
>>>>>      - locked <0x0000000659111b58> (a
>>>>> org.apache.kafka.clients.consumer.KafkaConsumer)
>>>>> ---
>>>>>
>>>>> If someone could advise me of the cause or the way to investigate
>>>>> further, that would be appreciated.
>>>>>
>>>>> Thanks,
>>>>> Hironori
>>>>>
>>>>>
>>>>>
>>>>> <flink-flink-taskmanager-0-FLINK1503.log.gz>
>>>>>
>>>>>

Mime
View raw message