flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: streaming job reading from kafka stuck while cancelling
Date Wed, 09 Mar 2016 13:42:32 GMT
Here is the Jira issue: https://issues.apache.org/jira/browse/FLINK-3595

On Wed, Mar 9, 2016 at 2:06 PM, Stephan Ewen <sewen@apache.org> wrote:

> Hi!
>
> Thanks for the debugging this, I think there is in fact an issue in the
> 0.9 consumer.
>
> I'll open a ticket for it, will try to fix that as soon as possible...
>
> Stephan
>
>
> On Wed, Mar 9, 2016 at 1:59 PM, Maciek Próchniak <mpr@touk.pl> wrote:
>
>> Hi,
>>
>> from time to time when we cancel streaming jobs (or they are failing for
>> some reason) we encounter:
>>
>> 2016-03-09 10:25:29,799 [Canceler for Source: read objects from topic:
>> (...) ' did not react to cancelling signal, but is stuck in method:
>>  java.lang.Object.wait(Native Method)
>> java.lang.Thread.join(Thread.java:1253)
>>
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.run(FlinkKafkaConsumer09.java:329)
>>
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
>>
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
>>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> java.lang.Thread.run(Thread.java:745)
>>
>>
>> Now, relevant stacktrace is this:
>>
>> "Thread-626104" #1244254 daemon prio=5 os_prio=0 tid=...  nid=0x2e96 in
>> Object.wait() [0x00007f2bac847000]
>>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
>>         at java.lang.Object.wait(Native Method)
>>         at
>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
>>         - locked <0x000000041ae00180> (a java.util.ArrayDeque)
>>         at
>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
>>         at
>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
>>         - locked <0x00000004be0002f0> (a
>> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
>>         at
>> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
>>         at
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78)
>>         at
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
>>         at
>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>         at
>> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
>>         at
>> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
>>         at scala.collection.immutable.List.foreach(List.scala:381)
>>         at
>> org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:541)
>>         at
>> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:48)
>>         at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>>         at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>>         at
>> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
>>         at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>>         at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>>         at
>> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
>>         at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>>         at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>>         at
>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>>         at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>>         at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>>         at
>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>>         at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>>         at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>>         at
>> org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)
>>         - locked <0x000000041ae001c8> (a java.lang.Object)
>>         at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473)
>>         - locked <0x000000041ae001c8> (a java.lang.Object)
>>
>> and also:
>> "OutputFlusher" #1244231 daemon prio=5 os_prio=0 tid=0x00007f2a39d4e800
>> nid=0x2e7d waiting for monitor entry [0x00007f2a3e5e4000]
>>    java.lang.Thread.State: BLOCKED (on object monitor)
>>         at
>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.flush(RecordWriter.java:172)
>>         - waiting to lock <0x00000004be0002f0> (a
>> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
>>         at
>> org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:176)
>>
>>
>> - looks like source tries to process remaining kafka messages, but it is
>> stuck on partitioning by key.
>>
>> I managed to take heap dump and look what's going on inside
>> LocalBufferPool
>> - availableMemorySegments queue is empty
>> - numberOfRequestedMemorySegments == currentPoolSize == 16
>> - there are not registeredListeners
>>
>> Now, it seems that loop in LocalBufferPool#142 without end, waiting for
>> buffer recycle - but from what I see it won't happen because OutputFlusher
>> is blocked by this loop.
>>
>> The problem occurs (it seems) when more or less at the same time as job
>> cancellation we start new job (e.g. taskmanager is restarted, one job is
>> failing because of some problem,
>> and another one is just starting) - so I wonder could it be some problem
>> with setNumBuffers method - although it looks synchronized enough...
>>
>> We are using version 1.0.0 (RC4) btw
>>
>> I hope to dig further into this - but for now this is all I managed to
>> find.
>>
>> thanks,
>> maciek
>>
>
>

Mime
View raw message