flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Piotr Nowojski (Jira)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-20618) Some of the source operator subtasks will stuck when flink job in critical backpressure
Date Wed, 16 Dec 2020 11:16:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-20618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17250255#comment-17250255
] 

Piotr Nowojski commented on FLINK-20618:
----------------------------------------

I do not see anything immediately suspicious here. It's expected behaviour for the any tasks
to eventually be blocked in case of backpressure. As long as the tasks unblock themselves
once the backpressure is gone, and the job can make a progress, there is no problem.

As [~roman_khachatryan] wrote, a backpressured task will be blocked waiting for the backpressure
to go away to make some progress. Waiting can be:
# non blocking (waiting on mailbox, task thread is idling)
# blocking (task is waiting on requesting a buffer, task thread is blocked)

The second path is "legacy" one, which we are trying to avoid as best as we can. It's not
always possible. For example, for old/legacy source tasks ([new source interface|https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface]
doesn't have this problem), the second path is always used.

Regarding the sequence numbers. Sent sequence number is from the upstream task ({{CreditBasedSequenceNumberingViewReader}}),
expected sequence number is from the downstream task ({{RemoteInputChannel}}). It looks like
in your screenshots one buffer is somewhere in transfer. I wouldn't worry about it, as [~roman_khachatryan]
wrote, if there is a problem in this area, it would fail with {{BufferReorderingException}}.


> Some of the source operator subtasks will stuck when flink job in critical backpressure
> ---------------------------------------------------------------------------------------
>
>                 Key: FLINK-20618
>                 URL: https://issues.apache.org/jira/browse/FLINK-20618
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Network
>    Affects Versions: 1.11.0, 1.10.2
>            Reporter: zlzhang0122
>            Priority: Critical
>         Attachments: 2020-12-16 11-47-37 的屏幕截图.png, 2020-12-16 11-48-30 的屏幕截图.png,
2020-12-16 11-49-01 的屏幕截图.png, 2020-12-16 11-53-42 的屏幕截图.png
>
>
> In some critical backpressure situation, some of the subtasks of source will blocked
to request buffer because of the LocalBufferPool is full,so the whole task will be stuck and
the other task run well.
> Bellow is the jstack trace:
>  
> Legacy Source Thread - Source: TalosTableSource(hash, timestamp, step, isCustomize, hostname,
endpoint, metric, dsType, orgId, idc, labels, pdl) -&gt; SourceConversion(table=[default_catalog.default_database.transfer_c5,
source: [TalosTableSource(hash, timestamp, step, isCustomize, hostname, endpoint, metric,
dsType, orgId, idc, labels, pdl)]], fields=[hash, timestamp, step, isCustomize, hostname,
endpoint, metric, dsType, orgId, idc, labels, pdl]) -&gt; Calc(select=[hash, timestamp,
step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl, () AS $12,
(timestamp - ((timestamp + 18000) MOD 86400)) AS $13]) (62/128) #113 prio=5 os_prio=0 tid=0x00007f43d07e1800
nid=0x1b1c waiting on condition [0x00007f43b8488000]
> java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for &lt;0x00000000db234488&gt; (a java.util.concurrent.CompletableFuture$Signaller)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>     at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
>     at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>     at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
>     at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231)
>     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:209)
>     at org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189)
>     at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
>     at org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:145)
>     at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116)
>     at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
>     at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
>     at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>     at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
>     at StreamExecCalc$33.processElement(Unknown Source)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
>     at SourceConversion$4.processElement(Unknown Source)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
>     at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>     - locked &lt;0x00000000d8d50fa8&gt; (a java.lang.Object)
>     at org.apache.flink.streaming.connectors.talos.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:379)
>     - locked &lt;0x00000000d8d50fa8&gt; (a java.lang.Object)
>     at org.apache.flink.streaming.connectors.talos.internals.TalosFetcher2.runFetchLoop(TalosFetcher2.java:249)
>     at org.apache.flink.streaming.connectors.talos.FlinkTalosConsumerBase.run(FlinkTalosConsumerBase.java:758)
>     at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>     at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>     at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
>  
>  
>  
> Source: TalosTableSource(hash, timestamp, step, isCustomize, hostname, endpoint, metric,
dsType, orgId, idc, labels, pdl) -&gt; SourceConversion(table=[default_catalog.default_database.transfer_c5,
source: [TalosTableSource(hash, timestamp, step, isCustomize, hostname, endpoint, metric,
dsType, orgId, idc, labels, pdl)]], fields=[hash, timestamp, step, isCustomize, hostname,
endpoint, metric, dsType, orgId, idc, labels, pdl]) -&gt; Calc(select=[hash, timestamp,
step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl, () AS $12,
(timestamp - ((timestamp + 18000) MOD 86400)) AS $13]) (62/128) #108 prio=5 os_prio=0 tid=0x00007f44dc178000
nid=0x1332 waiting for monitor entry [0x00007f443dfd8000]
> java.lang.Thread.State: BLOCKED (on object monitor)
>     at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:86)
>     - waiting to lock &lt;0x00000000d8d50fa8&gt; (a java.lang.Object)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>     at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message