I guess it was happening because I canceled the old job and started it again. When I restarted my cluster it stoped to throw the error.
But I still not sure which metric I can infer if backpressure is happening.

Felipe
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Wed, Nov 6, 2019 at 4:30 PM Zhijiang <wangzhijiang999@aliyun.com> wrote:
Hi Felipe,

"Buffer pool is destroyed" is mainly caused by canceling task. That means there are other tasks failure which would trigger canceling all the topology tasks by job master.
So if you want to find the root cause, it is proper to check the job master log to find the first failure which triggers the following cancel operations.

In addition, which flink version are you using?

Best,
Zhijiang

------------------------------------------------------------------
From:Felipe Gutierrez <felipe.o.gutierrez@gmail.com>
Send Time:2019 Nov. 6 (Wed.) 19:12
Subject:What metrics can I see the root cause of "Buffer pool is destroyed" message?

Hi community,

Looking at the code [1] it seems that it is related to not have availableMemorySegments anymore. I am looking at several metrics but it hasn't seemed to help me understand where I can measure the root cause of this error message.

- flink_taskmanager_Status_Shuffle_Netty_AvailableMemorySegments does not seem to give me a related cause.
- flink_taskmanager_job_task_Shuffle_Netty_Output_Buffers_inputQueueLength I see my reducer operator always with queue lenght equal 4. Pre-aggregate task sometimes goes to 3 but it goes only few times.
- flink_taskmanager_job_task_Shuffle_Netty_Input_Buffers_outPoolUsage and flink_taskmanager_job_task_Shuffle_Netty_Input_Buffers_outputQueueLength shows my source task several times in 100%. But my error message comes from the pre-aggregate task.
- flink_taskmanager_job_task_Shuffle_Netty_Output_numBuffersInLocalPerSecond DOES show the the pre-aggregate task is consuming a lot. But with which metric can I relate this to know in advance how much is a lot?

[1] https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java#L265

Thanks for your suggestions and here is my stack trace:

java.lang.RuntimeException: Buffer pool is destroyed.
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
at org.apache.flink.streaming.examples.aggregate.WordCountPreAggregate$WordCountPreAggregateFunction.collect(WordCountPreAggregate.java:251)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamPreAggregateOperator.collect(AbstractUdfStreamPreAggregateOperator.java:84)
at org.apache.flink.streaming.api.functions.aggregation.PreAggregateTriggerFunction.collect(PreAggregateTriggerFunction.java:49)
at org.apache.flink.streaming.api.functions.aggregation.PreAggregateTriggerFunction.run(PreAggregateTriggerFunction.java:63)
at java.util.TimerThread.mainLoop(Timer.java:555)
at java.util.TimerThread.run(Timer.java:505)
Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentFromGlobal(LocalBufferPool.java:264)
at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:240)
at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:215)
at org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:182)
at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.getBufferBuilder(ChannelSelectorRecordWriter.java:95)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:131)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116)
at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez