flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andrey Melentyev <andrey.melent...@gmail.com>
Subject Re: Flink streaming job with iterations gets stuck waiting for network buffers
Date Fri, 14 Apr 2017 12:38:26 GMT
Thanks Paris and Gábor!

The throttling workaround considering it's disadvantages is not very
exactly optimal for production deployment so I won't be trying that out.
I'll keep an eye on the FLIP-15 progress instead. Feel free to reach out to
me if you want to validate the FLIP-15 proposal implementation on my
workflow.

Regards
Andrey

On Mon, Apr 3, 2017 at 9:43 PM, Paris Carbone <parisc@kth.se> wrote:

> +1
> you can do custom application level flow control,if you feel adventurous,
> to lower the possibility of a deadlock, that combined with more allocated
> network buffers.
>
> We will give our best to speed up FLIP-15. A large part of the
> functionality is already implemented as you can see in the PRs. The hard
> part is to get a final version of the proposal decided and accepted by the
> community which typically takes time depending on the general interest and
> difficulty of the task.
>
> On 3 Apr 2017, at 19:37, Gábor Hermann <mail@gaborhermann.com> wrote:
>
> Hi Andrey,
>
> As Paris has explained it, this is a known issue and there are ongoing
> efforts to solve it.
>
> I can suggest a workaround: limit the amount of messages sent into the
> iteration manually. You can do this with a e.g. a Map operator that limits
> records per seconds and simply sends what it has received. You can check at
> every incoming record whether the limit has been reached, and if so
> Thread.sleep until the next second. You could place Map operator before the
> operator that ingests data into the iteration (operator with ID 9 in your
> dataflow graph). This way you can avoid overloading the network inside the
> iteration, and thus avoid deadlock caused by backpressure.
>
> This approach is, of course, a bit hacky. Also, it does not eliminate the
> possibility of a deadlock entirely. Other disadvantage is that you have to
> manually tune the rate of ingesting. That could depend on lot of things:
> the data load, the number of operator instances, the placement of operator
> instances, etc. But I have used something like this as a temporary
> workaround until we see more progress with FLIP-15.
>
> Cheers,
> Gabor
>
>
> On 2017-04-03 13:33, Paris Carbone wrote:
>
> Hi Andrey,
>
> If I am not mistaken this sounds like a known deadlock case and can be
> caused by the combination of Flink's backpressure mechanism with iterations
> (more likely when there is heavy feedback load).
> Keep in mind that, currently, iterations are (perhaps the only) not stable
> feature to use. The good news is that there is a complete redesign planned
> for it (partly FLIP-15 [1]) that has to entirely address this pending flow
> control issue as well.
>
> Increasing network buffers or feedback queue capacity to a really high
> number decreases the possibility of the deadlock but does not eliminate it.
> I really cannot think of a quick solution to the problem that does not
> involve some deep changes.
>
> I am CCing dev since this seems like a very relevant use case to revive
> the discussion for the loops redesign and also keep you in the loop (no pun
> intended) regarding this specific issue.
> Will also update FLIP-15 with several interesting proposals under
> discussion from Stephan to tackle this issue.
>
> cheers,
> Paris
>
> [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> 15+Scoped+Loops+and+Job+Termination
>
>
> On 3 Apr 2017, at 12:54, Andrey Melentyev <andrey.melentyev@gmail.com<ma
> ilto:andrey.melentyev@gmail.com <andrey.melentyev@gmail.com>>> wrote:
>
> Hi,
>
> I have a Flink 1.2.0 streaming job using a number of stateful operators
> and an iteration loop with a RichFlatMapFunction inside. On the high level,
> the app reads some data, massages it and feeds into an iterative algorithm
> which produces some output and feedback while keeping the state. All
> stateful operators are on KeyedStreams. Input is some data on file system
> and output is stdout.
>
> The implementation passes functional tests but when tested with noticeable
> amounts of input data (tens of thousands records, dozens of MB raw data)
> after a few seconds of good throughput, backpressure kicks in and the
> application essentially gets stuck: most of the threads are blocked waiting
> for buffers, occasional message gets processed every few minutes. There's
> nothing strange in the log files.
>
> The behaviour is reproducible both in local execution environment and in
> Flink standalone cluster (started using jobmanager.sh and taskmanager.sh)
>
> The problematic part is likely in the iterations since the part of the job
> before iterations works fine with the same data.
>
> I would appreciate pointers as to how to debug this. taskmanager.network.numberOfBuffers
> from the config sounds relevant but the default value of 2048 is already
> much higher than slots-per-TM^2 * #TMs * 4 = 4^2 * 1 * 4 = 64.
>
> Attaching flink config, job execution plan and thread dump with some
> sensitive parts retracted.
>
> flink-conf.yml
>
> jobmanager.rpc.address: localhost
> jobmanager.rpc.port: 6123
> jobmanager.heap.mb: 512
> taskmanager.heap.mb: 8192
> taskmanager.numberOfTaskSlots: 4
> taskmanager.memory.preallocate: false
> parallelism.default: 4
> jobmanager.web.port: 8081
> state.backend: rocksdb
> state.backend.fs.checkpointdir: file:///Users/andrey.melentyev/tmp/flink-
> checkpoints
>
> Job execution plan
>
> {
>   "nodes": [
>     {
>       "contents": "IterationSource-10",
>       "id": -1,
>       "pact": "Data Source",
>       "parallelism": 8,
>       "type": "IterationSource-10"
>     },
>     {
>       "contents": "Source: Custom File Source",
>       "id": 1,
>       "pact": "Data Source",
>       "parallelism": 1,
>       "type": "Source: Custom File Source"
>     },
>     {
>       "contents": "Split Reader: Custom File Source",
>       "id": 2,
>       "pact": "Operator",
>       "parallelism": 8,
>       "predecessors": [
>         {
>           "id": 1,
>           "ship_strategy": "REBALANCE",
>           "side": "second"
>         }
>       ],
>       "type": "Split Reader: Custom File Source"
>     },
>     {
>       "contents": "Parse JSON",
>       "id": 3,
>       "pact": "Operator",
>       "parallelism": 8,
>       "predecessors": [
>         {
>           "id": 2,
>           "ship_strategy": "FORWARD",
>           "side": "second"
>         }
>       ],
>       "type": "Parse JSON"
>     },
>     {
>       "contents": "Split records",
>       "id": 4,
>       "pact": "Operator",
>       "parallelism": 8,
>       "predecessors": [
>         {
>           "id": 3,
>           "ship_strategy": "FORWARD",
>           "side": "second"
>         }
>       ],
>       "type": "Split records (Stateless)"
>     },
>     {
>       "contents": "Produce Some Data",
>       "id": 6,
>       "pact": "Operator",
>       "parallelism": 8,
>       "predecessors": [
>         {
>           "id": 3,
>           "ship_strategy": "FORWARD",
>           "side": "second"
>         }
>       ],
>       "type": "Produce Some Data (Stateless)"
>     },
>     {
>       "contents": "Produce Some More Data (Stateful)",
>       "id": 7,
>       "pact": "Operator",
>       "parallelism": 8,
>       "predecessors": [
>         {
>           "id": 4,
>           "ship_strategy": "HASH",
>           "side": "second"
>         }
>       ],
>       "type": "Produce Some More Data (Stateful)"
>     },
>     {
>       "contents": "Map",
>       "id": 9,
>       "pact": "Operator",
>       "parallelism": 8,
>       "predecessors": [
>         {
>           "id": 6,
>           "ship_strategy": "FORWARD",
>           "side": "second"
>         },
>         {
>           "id": 7,
>           "ship_strategy": "FORWARD",
>           "side": "second"
>         }
>       ],
>       "type": "Map"
>     },
>     {
>       "contents": "Iteration Step Function",
>       "id": 12,
>       "pact": "Operator",
>       "parallelism": 8,
>       "predecessors": [
>         {
>           "id": 9,
>           "ship_strategy": "HASH",
>           "side": "second"
>         },
>         {
>           "id": -1,
>           "ship_strategy": "HASH",
>           "side": "second"
>         }
>       ],
>       "type": "Iteration Step Function"
>     },
>     {
>       "contents": "Flat Map / Iteration Feedback",
>       "id": 13,
>       "pact": "Operator",
>       "parallelism": 8,
>       "predecessors": [
>         {
>           "id": 12,
>           "ship_strategy": "FORWARD",
>           "side": "second"
>         }
>       ],
>       "type": "Flat Map / Iteration Feedback"
>     },
>     {
>       "contents": "Iterative Algorithm (Stateful)",
>       "id": 14,
>       "pact": "Operator",
>       "parallelism": 8,
>       "predecessors": [
>         {
>           "id": 12,
>           "ship_strategy": "FORWARD",
>           "side": "second"
>         }
>       ],
>       "type": "Iterative Algorithm (Stateful)"
>     },
>     {
>       "contents": "Flat Map / Iteration Result",
>       "id": 15,
>       "pact": "Operator",
>       "parallelism": 8,
>       "predecessors": [
>         {
>           "id": 14,
>           "ship_strategy": "FORWARD",
>           "side": "second"
>         }
>       ],
>       "type": "Flat Map / Iteration Result"
>     },
>     {
>       "contents": "IterationSink-10",
>       "id": -2,
>       "pact": "Data Sink",
>       "parallelism": 8,
>       "predecessors": [
>         {
>           "id": 13,
>           "ship_strategy": "FORWARD",
>           "side": "second"
>         }
>       ],
>       "type": "IterationSink-10"
>     },
>     {
>       "contents": "Sink: stdout",
>       "id": 16,
>       "pact": "Data Sink",
>       "parallelism": 8,
>       "predecessors": [
>         {
>           "id": 15,
>           "ship_strategy": "FORWARD",
>           "side": "second"
>         }
>       ],
>       "type": "Sink: stdout"
>     }
>   ]
> }
>
>
> Thread dump
>
> 2017-04-03 12:15:51
> Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.112-b16 mixed mode):
>
> "Thread-22" #531 daemon prio=5 os_prio=31 tid=0x00007f9ca0054000
> nid=0x140af in Object.wait() [0x0000700008d6f000]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at org.apache.flink.runtime.io.network.buffer.
> LocalBufferPool.requestBuffer(LocalBufferPool.java:168)
> - locked <0x00000007ab00e210> (a java.util.ArrayDeque)
> at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.
> requestBufferBlocking(LocalBufferPool.java:138)
> at org.apache.flink.runtime.io.network.api.writer.
> RecordWriter.sendToTarget(RecordWriter.java:131)
> - locked <0x00000007ab0605b0> (a org.apache.flink.runtime.io.
> network.api.serialization.SpanningRecordSerializer)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(
> RecordWriter.java:88)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter.
> emit(StreamRecordWriter.java:86)
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.
> collect(RecordWriterOutput.java:72)
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.
> collect(RecordWriterOutput.java:39)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:797)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:775)
> at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
> TimestampedCollector.java:51)
> at org.apache.flink.streaming.api.scala.DataStream$$anon$6$$
> anonfun$flatMap$1.apply(DataStream.scala:577)
> at org.apache.flink.streaming.api.scala.DataStream$$anon$6$$
> anonfun$flatMap$1.apply(DataStream.scala:577)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at org.apache.flink.streaming.api.scala.DataStream$$anon$6.
> flatMap(DataStream.scala:577)
> at org.apache.flink.streaming.api.operators.StreamFlatMap.
> processElement(StreamFlatMap.java:47)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:422)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:407)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> BroadcastingOutputCollector.collect(OperatorChain.java:462)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> BroadcastingOutputCollector.collect(OperatorChain.java:430)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:797)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:775)
> at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
> TimestampedCollector.java:51)
> ...
> at org.apache.flink.streaming.api.operators.StreamFlatMap.
> processElement(StreamFlatMap.java:47)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:422)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:407)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:797)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:775)
> at org.apache.flink.streaming.api.operators.StreamSourceContexts$
> NonTimestampContext.collect(StreamSourceContexts.java:84)
> - locked <0x00000007ab00ccc8> (a java.lang.Object)
> at org.apache.flink.streaming.api.functions.source.
> ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.
> java:321)
> - locked <0x00000007ab00ccc8> (a java.lang.Object)
>
> "Thread-21" #530 daemon prio=5 os_prio=31 tid=0x00007f9c9f019800
> nid=0xb61f in Object.wait() [0x000070000a8c0000]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at org.apache.flink.runtime.io.network.buffer.
> LocalBufferPool.requestBuffer(LocalBufferPool.java:168)
> - locked <0x00000007ab86fb30> (a java.util.ArrayDeque)
> at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.
> requestBufferBlocking(LocalBufferPool.java:138)
> at org.apache.flink.runtime.io.network.api.writer.
> RecordWriter.sendToTarget(RecordWriter.java:131)
> - locked <0x00000007ab8c3800> (a org.apache.flink.runtime.io.
> network.api.serialization.SpanningRecordSerializer)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(
> RecordWriter.java:88)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter.
> emit(StreamRecordWriter.java:86)
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.
> collect(RecordWriterOutput.java:72)
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.
> collect(RecordWriterOutput.java:39)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:797)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:775)
> at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
> TimestampedCollector.java:51)
> at org.apache.flink.streaming.api.scala.DataStream$$anon$6$$
> anonfun$flatMap$1.apply(DataStream.scala:577)
> at org.apache.flink.streaming.api.scala.DataStream$$anon$6$$
> anonfun$flatMap$1.apply(DataStream.scala:577)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at org.apache.flink.streaming.api.scala.DataStream$$anon$6.
> flatMap(DataStream.scala:577)
> at org.apache.flink.streaming.api.operators.StreamFlatMap.
> processElement(StreamFlatMap.java:47)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:422)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:407)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> BroadcastingOutputCollector.collect(OperatorChain.java:462)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> BroadcastingOutputCollector.collect(OperatorChain.java:430)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:797)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:775)
> at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
> TimestampedCollector.java:51)
> ...
> at play.api.libs.json.JsResult$class.fold(JsResult.scala:72)
> at play.api.libs.json.JsSuccess.fold(JsResult.scala:9)
> ...
> at org.apache.flink.streaming.api.operators.StreamFlatMap.
> processElement(StreamFlatMap.java:47)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:422)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:407)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:797)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:775)
> at org.apache.flink.streaming.api.operators.StreamSourceContexts$
> NonTimestampContext.collect(StreamSourceContexts.java:84)
> - locked <0x00000007ab86e5e8> (a java.lang.Object)
> at org.apache.flink.streaming.api.functions.source.
> ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.
> java:321)
> - locked <0x00000007ab86e5e8> (a java.lang.Object)
>
> "Thread-20" #529 daemon prio=5 os_prio=31 tid=0x00007f9c9ff9b000
> nid=0x13217 in Object.wait() [0x000070000a7bd000]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at org.apache.flink.runtime.io.network.buffer.
> LocalBufferPool.requestBuffer(LocalBufferPool.java:168)
> - locked <0x00000007aae18048> (a java.util.ArrayDeque)
> at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.
> requestBufferBlocking(LocalBufferPool.java:138)
> at org.apache.flink.runtime.io.network.api.writer.
> RecordWriter.sendToTarget(RecordWriter.java:131)
> - locked <0x00000007aae6c6e8> (a org.apache.flink.runtime.io.
> network.api.serialization.SpanningRecordSerializer)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(
> RecordWriter.java:88)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter.
> emit(StreamRecordWriter.java:86)
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.
> collect(RecordWriterOutput.java:72)
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.
> collect(RecordWriterOutput.java:39)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:797)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:775)
> at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
> TimestampedCollector.java:51)
> at org.apache.flink.streaming.api.scala.DataStream$$anon$6$$
> anonfun$flatMap$1.apply(DataStream.scala:577)
> at org.apache.flink.streaming.api.scala.DataStream$$anon$6$$
> anonfun$flatMap$1.apply(DataStream.scala:577)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at org.apache.flink.streaming.api.scala.DataStream$$anon$6.
> flatMap(DataStream.scala:577)
> at org.apache.flink.streaming.api.operators.StreamFlatMap.
> processElement(StreamFlatMap.java:47)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:422)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:407)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> BroadcastingOutputCollector.collect(OperatorChain.java:462)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> BroadcastingOutputCollector.collect(OperatorChain.java:430)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:797)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:775)
> at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
> TimestampedCollector.java:51)
> ...
> at play.api.libs.json.JsResult$class.fold(JsResult.scala:72)
> at play.api.libs.json.JsSuccess.fold(JsResult.scala:9)
> ...
> at org.apache.flink.streaming.api.operators.StreamFlatMap.
> processElement(StreamFlatMap.java:47)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:422)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:407)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:797)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:775)
> at org.apache.flink.streaming.api.operators.StreamSourceContexts$
> NonTimestampContext.collect(StreamSourceContexts.java:84)
> - locked <0x00000007aae16b00> (a java.lang.Object)
> at org.apache.flink.streaming.api.functions.source.
> ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.
> java:321)
> - locked <0x00000007aae16b00> (a java.lang.Object)
>
> "Thread-19" #528 daemon prio=5 os_prio=31 tid=0x00007f9c9c02e000
> nid=0xe31b in Object.wait() [0x000070000a6ba000]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at org.apache.flink.runtime.io.network.buffer.
> LocalBufferPool.requestBuffer(LocalBufferPool.java:168)
> - locked <0x00000007ab60e208> (a java.util.ArrayDeque)
> at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.
> requestBufferBlocking(LocalBufferPool.java:138)
> at org.apache.flink.runtime.io.network.api.writer.
> RecordWriter.sendToTarget(RecordWriter.java:131)
> - locked <0x00000007ab661430> (a org.apache.flink.runtime.io.
> network.api.serialization.SpanningRecordSerializer)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(
> RecordWriter.java:88)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter.
> emit(StreamRecordWriter.java:86)
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.
> collect(RecordWriterOutput.java:72)
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.
> collect(RecordWriterOutput.java:39)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:797)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:775)
> at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
> TimestampedCollector.java:51)
> at org.apache.flink.streaming.api.scala.DataStream$$anon$6$$
> anonfun$flatMap$1.apply(DataStream.scala:577)
> at org.apache.flink.streaming.api.scala.DataStream$$anon$6$$
> anonfun$flatMap$1.apply(DataStream.scala:577)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at org.apache.flink.streaming.api.scala.DataStream$$anon$6.
> flatMap(DataStream.scala:577)
> at org.apache.flink.streaming.api.operators.StreamFlatMap.
> processElement(StreamFlatMap.java:47)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:422)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:407)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> BroadcastingOutputCollector.collect(OperatorChain.java:462)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> BroadcastingOutputCollector.collect(OperatorChain.java:430)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:797)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:775)
> at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
> TimestampedCollector.java:51)
> ...
> at play.api.libs.json.JsResult$class.fold(JsResult.scala:72)
> at play.api.libs.json.JsSuccess.fold(JsResult.scala:9)
> ...
> at org.apache.flink.streaming.api.operators.StreamFlatMap.
> processElement(StreamFlatMap.java:47)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:422)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:407)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:797)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:775)
> at org.apache.flink.streaming.api.operators.StreamSourceContexts$
> NonTimestampContext.collect(StreamSourceContexts.java:84)
> - locked <0x00000007ab60ccc0> (a java.lang.Object)
> at org.apache.flink.streaming.api.functions.source.
> ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.
> java:321)
> - locked <0x00000007ab60ccc0> (a java.lang.Object)
>
> "OutputFlusher" #527 daemon prio=5 os_prio=31 tid=0x00007f9c9f042000
> nid=0xa313 waiting on condition [0x000070000a5b7000]
>    java.lang.Thread.State: TIMED_WAITING (sleeping)
> at java.lang.Thread.sleep(Native Method)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter$
> OutputFlusher.run(StreamRecordWriter.java:173)
>
> "OutputFlusher" #512 daemon prio=5 os_prio=31 tid=0x00007f9c9f8fc000
> nid=0xeb13 waiting for monitor entry [0x000070000a4b4000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.flush(
> RecordWriter.java:175)
> - waiting to lock <0x00000007a8e785c0> (a org.apache.flink.runtime.io.
> network.api.serialization.SpanningRecordSerializer)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter$
> OutputFlusher.run(StreamRecordWriter.java:185)
>
> "OutputFlusher" #515 daemon prio=5 os_prio=31 tid=0x00007f9c9ff9a800
> nid=0x15333 waiting for monitor entry [0x000070000a3b1000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.flush(
> RecordWriter.java:175)
> - waiting to lock <0x00000007a80609b0> (a org.apache.flink.runtime.io.
> network.api.serialization.SpanningRecordSerializer)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter$
> OutputFlusher.run(StreamRecordWriter.java:185)
>
> "OutputFlusher" #526 daemon prio=5 os_prio=31 tid=0x00007f9c9c02d800
> nid=0x9f13 waiting on condition [0x000070000a2ae000]
>    java.lang.Thread.State: TIMED_WAITING (sleeping)
> at java.lang.Thread.sleep(Native Method)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter$
> OutputFlusher.run(StreamRecordWriter.java:173)
>
> "OutputFlusher" #520 daemon prio=5 os_prio=31 tid=0x00007f9c9f9dc000
> nid=0x13c3f waiting for monitor entry [0x000070000a1ab000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.flush(
> RecordWriter.java:175)
> - waiting to lock <0x00000007aa254c48> (a org.apache.flink.runtime.io.
> network.api.serialization.SpanningRecordSerializer)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter$
> OutputFlusher.run(StreamRecordWriter.java:185)
>
> "OutputFlusher" #524 daemon prio=5 os_prio=31 tid=0x00007f9c9dbb4800
> nid=0x1012b waiting for monitor entry [0x0000700009fa5000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.flush(
> RecordWriter.java:175)
> - waiting to lock <0x00000007ab0605b0> (a org.apache.flink.runtime.io.
> network.api.serialization.SpanningRecordSerializer)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter$
> OutputFlusher.run(StreamRecordWriter.java:185)
>
> "OutputFlusher" #523 daemon prio=5 os_prio=31 tid=0x00007f9c9d804000
> nid=0x1095f waiting on condition [0x0000700009ea2000]
>    java.lang.Thread.State: TIMED_WAITING (sleeping)
> at java.lang.Thread.sleep(Native Method)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter$
> OutputFlusher.run(StreamRecordWriter.java:173)
>
> "OutputFlusher" #522 daemon prio=5 os_prio=31 tid=0x00007f9c9cc8b000
> nid=0x1311f waiting for monitor entry [0x0000700009d9f000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.flush(
> RecordWriter.java:175)
> - waiting to lock <0x00000007abe4f398> (a org.apache.flink.runtime.io.
> network.api.serialization.SpanningRecordSerializer)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter$
> OutputFlusher.run(StreamRecordWriter.java:185)
>
> "OutputFlusher" #521 daemon prio=5 os_prio=31 tid=0x00007f9ca08de000
> nid=0xf70f waiting for monitor entry [0x0000700009b99000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.flush(
> RecordWriter.java:175)
> - waiting to lock <0x00000007ab8c3800> (a org.apache.flink.runtime.io.
> network.api.serialization.SpanningRecordSerializer)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter$
> OutputFlusher.run(StreamRecordWriter.java:185)
>
> "OutputFlusher" #519 daemon prio=5 os_prio=31 tid=0x00007f9c9d803800
> nid=0xbe13 waiting for monitor entry [0x0000700009a96000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.flush(
> RecordWriter.java:175)
> - waiting to lock <0x00000007a86a53f0> (a org.apache.flink.runtime.io.
> network.api.serialization.SpanningRecordSerializer)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter$
> OutputFlusher.run(StreamRecordWriter.java:185)
>
> "OutputFlusher" #518 daemon prio=5 os_prio=31 tid=0x00007f9c9f39c800
> nid=0xb517 waiting for monitor entry [0x0000700009993000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.flush(
> RecordWriter.java:175)
> - waiting to lock <0x00000007ac27c5e0> (a org.apache.flink.runtime.io.
> network.api.serialization.SpanningRecordSerializer)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter$
> OutputFlusher.run(StreamRecordWriter.java:185)
>
> "OutputFlusher" #517 daemon prio=5 os_prio=31 tid=0x00007f9c9c07a800
> nid=0xaa1b waiting for monitor entry [0x0000700009890000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.flush(
> RecordWriter.java:175)
> - waiting to lock <0x00000007aae6c6e8> (a org.apache.flink.runtime.io.
> network.api.serialization.SpanningRecordSerializer)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter$
> OutputFlusher.run(StreamRecordWriter.java:185)
>
> "OutputFlusher" #516 daemon prio=5 os_prio=31 tid=0x00007f9c9d802800
> nid=0xc713 waiting for monitor entry [0x000070000978d000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.flush(
> RecordWriter.java:175)
> - waiting to lock <0x00000007a9c4bf48> (a org.apache.flink.runtime.io.
> network.api.serialization.SpanningRecordSerializer)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter$
> OutputFlusher.run(StreamRecordWriter.java:185)
>
> "OutputFlusher" #514 daemon prio=5 os_prio=31 tid=0x00007f9c9cc90000
> nid=0x15a33 waiting for monitor entry [0x000070000968a000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.flush(
> RecordWriter.java:175)
> - waiting to lock <0x00000007ac46b850> (a org.apache.flink.runtime.io.
> network.api.serialization.SpanningRecordSerializer)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter$
> OutputFlusher.run(StreamRecordWriter.java:185)
>
> "OutputFlusher" #513 daemon prio=5 os_prio=31 tid=0x00007f9c9d839000
> nid=0xe80f waiting on condition [0x0000700009587000]
>    java.lang.Thread.State: TIMED_WAITING (sleeping)
> at java.lang.Thread.sleep(Native Method)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter$
> OutputFlusher.run(StreamRecordWriter.java:173)
>
> "OutputFlusher" #511 daemon prio=5 os_prio=31 tid=0x00007f9c9f9da800
> nid=0x132b waiting for monitor entry [0x0000700009484000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.flush(
> RecordWriter.java:175)
> - waiting to lock <0x00000007aa914af0> (a org.apache.flink.runtime.io.
> network.api.serialization.SpanningRecordSerializer)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter$
> OutputFlusher.run(StreamRecordWriter.java:185)
>
> "OutputFlusher" #503 daemon prio=5 os_prio=31 tid=0x00007f9ca078c800
> nid=0xcf0f waiting for monitor entry [0x0000700009381000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.flush(
> RecordWriter.java:175)
> - waiting to lock <0x00000007ac873cd0> (a org.apache.flink.runtime.io.
> network.api.serialization.SpanningRecordSerializer)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter$
> OutputFlusher.run(StreamRecordWriter.java:185)
>
> "OutputFlusher" #510 daemon prio=5 os_prio=31 tid=0x00007f9c9f39c000
> nid=0x1021b waiting for monitor entry [0x000070000927e000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.flush(
> RecordWriter.java:175)
> - waiting to lock <0x00000007a94632e0> (a org.apache.flink.runtime.io.
> network.api.serialization.SpanningRecordSerializer)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter$
> OutputFlusher.run(StreamRecordWriter.java:185)
>
> "OutputFlusher" #509 daemon prio=5 os_prio=31 tid=0x00007f9c9fe14800
> nid=0x14943 waiting for monitor entry [0x000070000917b000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.flush(
> RecordWriter.java:175)
> - waiting to lock <0x00000007a907b6b8> (a org.apache.flink.runtime.io.
> network.api.serialization.SpanningRecordSerializer)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter$
> OutputFlusher.run(StreamRecordWriter.java:185)
>
> "OutputFlusher" #508 daemon prio=5 os_prio=31 tid=0x00007f9ca0055000
> nid=0x13897 waiting for monitor entry [0x0000700009078000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.flush(
> RecordWriter.java:175)
> - waiting to lock <0x00000007a984d400> (a org.apache.flink.runtime.io.
> network.api.serialization.SpanningRecordSerializer)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter$
> OutputFlusher.run(StreamRecordWriter.java:185)
>
> "OutputFlusher" #507 daemon prio=5 os_prio=31 tid=0x00007f9c9fe16800
> nid=0x11e0f waiting for monitor entry [0x0000700008f75000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.flush(
> RecordWriter.java:175)
> - waiting to lock <0x00000007ab661430> (a org.apache.flink.runtime.io.
> network.api.serialization.SpanningRecordSerializer)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter$
> OutputFlusher.run(StreamRecordWriter.java:185)
>
> "OutputFlusher" #506 daemon prio=5 os_prio=31 tid=0x00007f9c9d9f4800
> nid=0x15c67 waiting for monitor entry [0x0000700008e72000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.flush(
> RecordWriter.java:175)
> - waiting to lock <0x00000007aa451428> (a org.apache.flink.runtime.io.
> network.api.serialization.SpanningRecordSerializer)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter$
> OutputFlusher.run(StreamRecordWriter.java:185)
>
> "OutputFlusher" #504 daemon prio=5 os_prio=31 tid=0x00007f9c9e0af000
> nid=0x1293b waiting for monitor entry [0x0000700008c6c000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.flush(
> RecordWriter.java:175)
> - waiting to lock <0x00000007a7c8df08> (a org.apache.flink.runtime.io.
> network.api.serialization.SpanningRecordSerializer)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter$
> OutputFlusher.run(StreamRecordWriter.java:185)
>
> "OutputFlusher" #502 daemon prio=5 os_prio=31 tid=0x00007f9ca00a1800
> nid=0x128a7 waiting for monitor entry [0x0000700008b69000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.flush(
> RecordWriter.java:175)
> - waiting to lock <0x00000007a8c89000> (a org.apache.flink.runtime.io.
> network.api.serialization.SpanningRecordSerializer)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter$
> OutputFlusher.run(StreamRecordWriter.java:185)
>
> "CloseableReaperThread" #501 daemon prio=5 os_prio=31
> tid=0x00007f9ca0990800 nid=0xf013 in Object.wait() [0x0000700008a66000]
>    java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
> - locked <0x00000007a72017e8> (a java.lang.ref.ReferenceQueue$Lock)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
> at org.apache.flink.core.fs.SafetyNetCloseableRegistry$
> CloseableReaperThread.run(SafetyNetCloseableRegistry.java:145)
>
> "CloseableReaperThread" #500 daemon prio=5 os_prio=31
> tid=0x00007f9c9f147800 nid=0xdd0f in Object.wait() [0x0000700008963000]
>    java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
> - locked <0x00000007a7001010> (a java.lang.ref.ReferenceQueue$Lock)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
> at org.apache.flink.core.fs.SafetyNetCloseableRegistry$
> CloseableReaperThread.run(SafetyNetCloseableRegistry.java:145)
>
> "IterationSink-10 (4/4)" #499 daemon prio=5 os_prio=31
> tid=0x00007f9ca0990000 nid=0xa913 waiting on condition [0x0000700008860000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00000007abe79428> (a java.util.concurrent.locks.
> AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer$
> ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at java.util.concurrent.ArrayBlockingQueue.offer(
> ArrayBlockingQueue.java:379)
> at org.apache.flink.streaming.runtime.tasks.StreamIterationTail$
> IterationTailOutput.collect(StreamIterationTail.java:112)
> at org.apache.flink.streaming.runtime.tasks.StreamIterationTail$
> IterationTailOutput.collect(StreamIterationTail.java:85)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:797)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:775)
> at org.apache.flink.streaming.runtime.tasks.StreamIterationTail$
> RecordPusher.processElement(StreamIterationTail.java:71)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.
> processInput(StreamInputProcessor.java:185)
> - locked <0x00000007a720e830> (a java.lang.Object)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
> OneInputStreamTask.java:63)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:272)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> at java.lang.Thread.run(Thread.java:745)
>
> "IterationSink-10 (3/4)" #498 daemon prio=5 os_prio=31
> tid=0x00007f9c9d98b000 nid=0xd417 waiting on condition [0x000070000875d000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00000007ac2a63b8> (a java.util.concurrent.locks.
> AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer$
> ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at java.util.concurrent.ArrayBlockingQueue.offer(
> ArrayBlockingQueue.java:379)
> at org.apache.flink.streaming.runtime.tasks.StreamIterationTail$
> IterationTailOutput.collect(StreamIterationTail.java:112)
> at org.apache.flink.streaming.runtime.tasks.StreamIterationTail$
> IterationTailOutput.collect(StreamIterationTail.java:85)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:797)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:775)
> at org.apache.flink.streaming.runtime.tasks.StreamIterationTail$
> RecordPusher.processElement(StreamIterationTail.java:71)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.
> processInput(StreamInputProcessor.java:185)
> - locked <0x00000007a700bc80> (a java.lang.Object)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
> OneInputStreamTask.java:63)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:272)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> at java.lang.Thread.run(Thread.java:745)
>
> "CloseableReaperThread" #497 daemon prio=5 os_prio=31
> tid=0x00007f9c9f81d000 nid=0x1071b in Object.wait() [0x000070000865a000]
>    java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
> - locked <0x00000007a7401010> (a java.lang.ref.ReferenceQueue$Lock)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
> at org.apache.flink.core.fs.SafetyNetCloseableRegistry$
> CloseableReaperThread.run(SafetyNetCloseableRegistry.java:145)
>
> "IterationSink-10 (2/4)" #496 daemon prio=5 os_prio=31
> tid=0x00007f9c9f81c800 nid=0xf617 waiting on condition [0x0000700008557000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00000007ac495778> (a java.util.concurrent.locks.
> AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer$
> ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at java.util.concurrent.ArrayBlockingQueue.offer(
> ArrayBlockingQueue.java:379)
> at org.apache.flink.streaming.runtime.tasks.StreamIterationTail$
> IterationTailOutput.collect(StreamIterationTail.java:112)
> at org.apache.flink.streaming.runtime.tasks.StreamIterationTail$
> IterationTailOutput.collect(StreamIterationTail.java:85)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:797)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:775)
> at org.apache.flink.streaming.runtime.tasks.StreamIterationTail$
> RecordPusher.processElement(StreamIterationTail.java:71)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.
> processInput(StreamInputProcessor.java:185)
> - locked <0x00000007a740bc80> (a java.lang.Object)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
> OneInputStreamTask.java:63)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:272)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> at java.lang.Thread.run(Thread.java:745)
>
> "CloseableReaperThread" #495 daemon prio=5 os_prio=31
> tid=0x00007f9c9f147000 nid=0xf213 in Object.wait() [0x0000700008454000]
>    java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
> - locked <0x00000007a78017f0> (a java.lang.ref.ReferenceQueue$Lock)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
> at org.apache.flink.core.fs.SafetyNetCloseableRegistry$
> CloseableReaperThread.run(SafetyNetCloseableRegistry.java:145)
>
> "IterationSink-10 (1/4)" #494 daemon prio=5 os_prio=31
> tid=0x00007f9c9f9b0000 nid=0x11563 waiting on condition [0x0000700008351000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00000007ac89dd48> (a java.util.concurrent.locks.
> AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer$
> ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at java.util.concurrent.ArrayBlockingQueue.offer(
> ArrayBlockingQueue.java:379)
> at org.apache.flink.streaming.runtime.tasks.StreamIterationTail$
> IterationTailOutput.collect(StreamIterationTail.java:112)
> at org.apache.flink.streaming.runtime.tasks.StreamIterationTail$
> IterationTailOutput.collect(StreamIterationTail.java:85)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:797)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:775)
> at org.apache.flink.streaming.runtime.tasks.StreamIterationTail$
> RecordPusher.processElement(StreamIterationTail.java:71)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.
> processInput(StreamInputProcessor.java:185)
> - locked <0x00000007a780c460> (a java.lang.Object)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
> OneInputStreamTask.java:63)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:272)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> at java.lang.Thread.run(Thread.java:745)
>
> "CloseableReaperThread" #493 daemon prio=5 os_prio=31
> tid=0x00007f9ca003a800 nid=0x11427 in Object.wait() [0x000070000824e000]
>    java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
> - locked <0x00000007a7c015b8> (a java.lang.ref.ReferenceQueue$Lock)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
> at org.apache.flink.core.fs.SafetyNetCloseableRegistry$
> CloseableReaperThread.run(SafetyNetCloseableRegistry.java:145)
>
> "CloseableReaperThread" #492 daemon prio=5 os_prio=31
> tid=0x00007f9c9f146000 nid=0xd517 in Object.wait() [0x000070000814b000]
>    java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
> - locked <0x00000007a80015b8> (a java.lang.ref.ReferenceQueue$Lock)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
> at org.apache.flink.core.fs.SafetyNetCloseableRegistry$
> CloseableReaperThread.run(SafetyNetCloseableRegistry.java:145)
>
> "Iteration Step Function -> (Flat Map, Iterative Algorithm (Stateful) ->
> Flat Map -> Sink: stdout) (4/4)" #491 daemon prio=5 os_prio=31
> tid=0x00007f9c9c07a000 nid=0x10f1b in Object.wait() [0x0000700008047000]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at org.apache.flink.runtime.io.network.buffer.
> LocalBufferPool.requestBuffer(LocalBufferPool.java:168)
> - locked <0x00000007a7c0e6e0> (a java.util.ArrayDeque)
> at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.
> requestBufferBlocking(LocalBufferPool.java:138)
> at org.apache.flink.runtime.io.network.api.writer.
> RecordWriter.sendToTarget(RecordWriter.java:131)
> - locked <0x00000007a7c8df08> (a org.apache.flink.runtime.io.
> network.api.serialization.SpanningRecordSerializer)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(
> RecordWriter.java:88)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter.
> emit(StreamRecordWriter.java:86)
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.
> collect(RecordWriterOutput.java:72)
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.
> collect(RecordWriterOutput.java:39)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:797)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:775)
> at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
> TimestampedCollector.java:51)
> at org.apache.flink.streaming.api.scala.DataStream$$anon$6$$
> anonfun$flatMap$1.apply(DataStream.scala:577)
> at org.apache.flink.streaming.api.scala.DataStream$$anon$6$$
> anonfun$flatMap$1.apply(DataStream.scala:577)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at org.apache.flink.streaming.api.scala.DataStream$$anon$6.
> flatMap(DataStream.scala:577)
> at org.apache.flink.streaming.api.operators.StreamFlatMap.
> processElement(StreamFlatMap.java:47)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:422)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:407)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> BroadcastingOutputCollector.collect(OperatorChain.java:462)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> BroadcastingOutputCollector.collect(OperatorChain.java:430)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:797)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:775)
> at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
> TimestampedCollector.java:51)
> ...
> at scala.collection.Iterator$class.foreach(Iterator.scala:742)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> ...
> at org.apache.flink.streaming.api.operators.StreamFlatMap.
> processElement(StreamFlatMap.java:47)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.
> processInput(StreamInputProcessor.java:185)
> - locked <0x00000007a7c0cd98> (a java.lang.Object)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
> OneInputStreamTask.java:63)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:272)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> at java.lang.Thread.run(Thread.java:745)
>
> "CloseableReaperThread" #490 daemon prio=5 os_prio=31
> tid=0x00007f9ca0039800 nid=0xb00f in Object.wait() [0x0000700007f45000]
>    java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
> - locked <0x00000007a86179d0> (a java.lang.ref.ReferenceQueue$Lock)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
> at org.apache.flink.core.fs.SafetyNetCloseableRegistry$
> CloseableReaperThread.run(SafetyNetCloseableRegistry.java:145)
>
> "Iteration Step Function -> (Flat Map, Iterative Algorithm (Stateful) ->
> Flat Map -> Sink: stdout) (3/4)" #489 daemon prio=5 os_prio=31
> tid=0x00007f9c9c015800 nid=0x14d4f in Object.wait() [0x0000700007e41000]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at org.apache.flink.runtime.io.network.buffer.
> LocalBufferPool.requestBuffer(LocalBufferPool.java:168)
> - locked <0x00000007a800e6e0> (a java.util.ArrayDeque)
> at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.
> requestBufferBlocking(LocalBufferPool.java:138)
> at org.apache.flink.runtime.io.network.api.writer.
> RecordWriter.sendToTarget(RecordWriter.java:131)
> - locked <0x00000007a80609b0> (a org.apache.flink.runtime.io.
> network.api.serialization.SpanningRecordSerializer)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(
> RecordWriter.java:88)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter.
> emit(StreamRecordWriter.java:86)
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.
> collect(RecordWriterOutput.java:72)
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.
> collect(RecordWriterOutput.java:39)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:797)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:775)
> at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
> TimestampedCollector.java:51)
> at org.apache.flink.streaming.api.scala.DataStream$$anon$6$$
> anonfun$flatMap$1.apply(DataStream.scala:577)
> at org.apache.flink.streaming.api.scala.DataStream$$anon$6$$
> anonfun$flatMap$1.apply(DataStream.scala:577)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at org.apache.flink.streaming.api.scala.DataStream$$anon$6.
> flatMap(DataStream.scala:577)
> at org.apache.flink.streaming.api.operators.StreamFlatMap.
> processElement(StreamFlatMap.java:47)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:422)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:407)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> BroadcastingOutputCollector.collect(OperatorChain.java:462)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> BroadcastingOutputCollector.collect(OperatorChain.java:430)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:797)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:775)
> at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
> TimestampedCollector.java:51)
> ...
> at scala.collection.Iterator$class.foreach(Iterator.scala:742)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> ...
> at org.apache.flink.streaming.api.operators.StreamFlatMap.
> processElement(StreamFlatMap.java:47)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.
> processInput(StreamInputProcessor.java:185)
> - locked <0x00000007a800cd98> (a java.lang.Object)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
> OneInputStreamTask.java:63)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:272)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> at java.lang.Thread.run(Thread.java:745)
>
> "Iteration Step Function -> (Flat Map, Iterative Algorithm (Stateful) ->
> Flat Map -> Sink: stdout) (2/4)" #488 daemon prio=5 os_prio=31
> tid=0x00007f9c9d98a000 nid=0xdc0f in Object.wait() [0x0000700007d3e000]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at org.apache.flink.runtime.io.network.buffer.
> LocalBufferPool.requestBuffer(LocalBufferPool.java:168)
> - locked <0x00000007a8624af8> (a java.util.ArrayDeque)
> at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.
> requestBufferBlocking(LocalBufferPool.java:138)
> at org.apache.flink.runtime.io.network.api.writer.
> RecordWriter.sendToTarget(RecordWriter.java:131)
> - locked <0x00000007a86a53f0> (a org.apache.flink.runtime.io.
> network.api.serialization.SpanningRecordSerializer)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(
> RecordWriter.java:88)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter.
> emit(StreamRecordWriter.java:86)
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.
> collect(RecordWriterOutput.java:72)
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.
> collect(RecordWriterOutput.java:39)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:797)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:775)
> at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
> TimestampedCollector.java:51)
> at org.apache.flink.streaming.api.scala.DataStream$$anon$6$$
> anonfun$flatMap$1.apply(DataStream.scala:577)
> at org.apache.flink.streaming.api.scala.DataStream$$anon$6$$
> anonfun$flatMap$1.apply(DataStream.scala:577)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at org.apache.flink.streaming.api.scala.DataStream$$anon$6.
> flatMap(DataStream.scala:577)
> at org.apache.flink.streaming.api.operators.StreamFlatMap.
> processElement(StreamFlatMap.java:47)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:422)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:407)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> BroadcastingOutputCollector.collect(OperatorChain.java:462)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> BroadcastingOutputCollector.collect(OperatorChain.java:430)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:797)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:775)
> at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
> TimestampedCollector.java:51)
> ...
> at scala.collection.Iterator$class.foreach(Iterator.scala:742)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> ...
> at org.apache.flink.streaming.api.operators.StreamFlatMap.
> processElement(StreamFlatMap.java:47)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.
> processInput(StreamInputProcessor.java:185)
> - locked <0x00000007a86231b0> (a java.lang.Object)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
> OneInputStreamTask.java:63)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:272)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> at java.lang.Thread.run(Thread.java:745)
>
> "CloseableReaperThread" #487 daemon prio=5 os_prio=31
> tid=0x00007f9c9ccd5800 nid=0xd91b in Object.wait() [0x0000700007c3c000]
>    java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
> - locked <0x00000007a8e19be8> (a java.lang.ref.ReferenceQueue$Lock)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
> at org.apache.flink.core.fs.SafetyNetCloseableRegistry$
> CloseableReaperThread.run(SafetyNetCloseableRegistry.java:145)
>
> "Iteration Step Function -> (Flat Map, Iterative Algorithm (Stateful) ->
> Flat Map -> Sink: stdout) (1/4)" #485 daemon prio=5 os_prio=31
> tid=0x00007f9c99f9b800 nid=0xda1b in Object.wait() [0x0000700007b38000]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at org.apache.flink.runtime.io.network.buffer.
> LocalBufferPool.requestBuffer(LocalBufferPool.java:168)
> - locked <0x00000007a8e26d10> (a java.util.ArrayDeque)
> at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.
> requestBufferBlocking(LocalBufferPool.java:138)
> at org.apache.flink.runtime.io.network.api.writer.
> RecordWriter.sendToTarget(RecordWriter.java:131)
> - locked <0x00000007a8e785c0> (a org.apache.flink.runtime.io.
> network.api.serialization.SpanningRecordSerializer)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(
> RecordWriter.java:88)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter.
> emit(StreamRecordWriter.java:86)
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.
> collect(RecordWriterOutput.java:72)
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.
> collect(RecordWriterOutput.java:39)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:797)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:775)
> at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
> TimestampedCollector.java:51)
> at org.apache.flink.streaming.api.scala.DataStream$$anon$6$$
> anonfun$flatMap$1.apply(DataStream.scala:577)
> at org.apache.flink.streaming.api.scala.DataStream$$anon$6$$
> anonfun$flatMap$1.apply(DataStream.scala:577)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at org.apache.flink.streaming.api.scala.DataStream$$anon$6.
> flatMap(DataStream.scala:577)
> at org.apache.flink.streaming.api.operators.StreamFlatMap.
> processElement(StreamFlatMap.java:47)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:422)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:407)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> BroadcastingOutputCollector.collect(OperatorChain.java:462)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> BroadcastingOutputCollector.collect(OperatorChain.java:430)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:797)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:775)
> at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
> TimestampedCollector.java:51)
> ...
> at scala.collection.Iterator$class.foreach(Iterator.scala:742)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> ...
> at org.apache.flink.streaming.api.operators.StreamFlatMap.
> processElement(StreamFlatMap.java:47)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.
> processInput(StreamInputProcessor.java:185)
> - locked <0x00000007a8e253c8> (a java.lang.Object)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
> OneInputStreamTask.java:63)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:272)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> at java.lang.Thread.run(Thread.java:745)
>
> "CloseableReaperThread" #486 daemon prio=5 os_prio=31
> tid=0x00007f9c9f145800 nid=0xe70f in Object.wait() [0x0000700007a36000]
>    java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
> - locked <0x00000007a8c00e58> (a java.lang.ref.ReferenceQueue$Lock)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
> at org.apache.flink.core.fs.SafetyNetCloseableRegistry$
> CloseableReaperThread.run(SafetyNetCloseableRegistry.java:145)
>
> "Map (4/4)" #484 daemon prio=5 os_prio=31 tid=0x00007f9ca0956000
> nid=0xea13 in Object.wait() [0x0000700007933000]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at org.apache.flink.runtime.io.network.buffer.
> LocalBufferPool.requestBuffer(LocalBufferPool.java:168)
> - locked <0x00000007a8c0cc38> (a java.util.ArrayDeque)
> at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.
> requestBufferBlocking(LocalBufferPool.java:138)
> at org.apache.flink.runtime.io.network.api.writer.
> RecordWriter.sendToTarget(RecordWriter.java:131)
> - locked <0x00000007a8c89000> (a org.apache.flink.runtime.io.
> network.api.serialization.SpanningRecordSerializer)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(
> RecordWriter.java:88)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter.
> emit(StreamRecordWriter.java:86)
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.
> collect(RecordWriterOutput.java:72)
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.
> collect(RecordWriterOutput.java:39)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:797)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:775)
> at org.apache.flink.streaming.api.operators.StreamMap.
> processElement(StreamMap.java:38)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.
> processInput(StreamInputProcessor.java:185)
> - locked <0x00000007a8c0ba38> (a java.lang.Object)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
> OneInputStreamTask.java:63)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:272)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> at java.lang.Thread.run(Thread.java:745)
>
> "CloseableReaperThread" #483 daemon prio=5 os_prio=31
> tid=0x00007f9c9e03c800 nid=0x157c3 in Object.wait() [0x0000700007830000]
>    java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
> - locked <0x00000007a9000e58> (a java.lang.ref.ReferenceQueue$Lock)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
> at org.apache.flink.core.fs.SafetyNetCloseableRegistry$
> CloseableReaperThread.run(SafetyNetCloseableRegistry.java:145)
>
> "Map (3/4)" #482 daemon prio=5 os_prio=31 tid=0x00007f9c9e0c8000
> nid=0xd70f in Object.wait() [0x000070000772d000]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at org.apache.flink.runtime.io.network.buffer.
> LocalBufferPool.requestBuffer(LocalBufferPool.java:168)
> - locked <0x00000007a900cc20> (a java.util.ArrayDeque)
> at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.
> requestBufferBlocking(LocalBufferPool.java:138)
> at org.apache.flink.runtime.io.network.api.writer.
> RecordWriter.sendToTarget(RecordWriter.java:131)
> - locked <0x00000007a907b6b8> (a org.apache.flink.runtime.io.
> network.api.serialization.SpanningRecordSerializer)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(
> RecordWriter.java:88)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter.
> emit(StreamRecordWriter.java:86)
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.
> collect(RecordWriterOutput.java:72)
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.
> collect(RecordWriterOutput.java:39)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:797)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:775)
> at org.apache.flink.streaming.api.operators.StreamMap.
> processElement(StreamMap.java:38)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.
> processInput(StreamInputProcessor.java:185)
> - locked <0x00000007a900ba20> (a java.lang.Object)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
> OneInputStreamTask.java:63)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:272)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> at java.lang.Thread.run(Thread.java:745)
>
> "CloseableReaperThread" #481 daemon prio=5 os_prio=31
> tid=0x00007f9c9daff800 nid=0x11f13 in Object.wait() [0x000070000762a000]
>    java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
> - locked <0x00000007a9400e58> (a java.lang.ref.ReferenceQueue$Lock)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
> at org.apache.flink.core.fs.SafetyNetCloseableRegistry$
> CloseableReaperThread.run(SafetyNetCloseableRegistry.java:145)
>
> "Map (2/4)" #480 daemon prio=5 os_prio=31 tid=0x00007f9ca0955000
> nid=0xac17 in Object.wait() [0x0000700007527000]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at org.apache.flink.runtime.io.network.buffer.
> LocalBufferPool.requestBuffer(LocalBufferPool.java:168)
> - locked <0x00000007a940cc38> (a java.util.ArrayDeque)
> at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.
> requestBufferBlocking(LocalBufferPool.java:138)
> at org.apache.flink.runtime.io.network.api.writer.
> RecordWriter.sendToTarget(RecordWriter.java:131)
> - locked <0x00000007a94632e0> (a org.apache.flink.runtime.io.
> network.api.serialization.SpanningRecordSerializer)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(
> RecordWriter.java:88)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter.
> emit(StreamRecordWriter.java:86)
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.
> collect(RecordWriterOutput.java:72)
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.
> collect(RecordWriterOutput.java:39)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:797)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:775)
> at org.apache.flink.streaming.api.operators.StreamMap.
> processElement(StreamMap.java:38)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.
> processInput(StreamInputProcessor.java:185)
> - locked <0x00000007a940ba38> (a java.lang.Object)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
> OneInputStreamTask.java:63)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:272)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> at java.lang.Thread.run(Thread.java:745)
>
> "CloseableReaperThread" #479 daemon prio=5 os_prio=31
> tid=0x00007f9ca0954800 nid=0xc30f in Object.wait() [0x0000700007424000]
>    java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
> - locked <0x00000007a9800e58> (a java.lang.ref.ReferenceQueue$Lock)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
> at org.apache.flink.core.fs.SafetyNetCloseableRegistry$
> CloseableReaperThread.run(SafetyNetCloseableRegistry.java:145)
>
> "Map (1/4)" #478 daemon prio=5 os_prio=31 tid=0x00007f9c9ca80000
> nid=0x15b0f in Object.wait() [0x0000700007321000]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at org.apache.flink.runtime.io.network.buffer.
> LocalBufferPool.requestBuffer(LocalBufferPool.java:168)
> - locked <0x00000007a980ccc8> (a java.util.ArrayDeque)
> at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.
> requestBufferBlocking(LocalBufferPool.java:138)
> at org.apache.flink.runtime.io.network.api.writer.
> RecordWriter.sendToTarget(RecordWriter.java:131)
> - locked <0x00000007a984d400> (a org.apache.flink.runtime.io.
> network.api.serialization.SpanningRecordSerializer)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(
> RecordWriter.java:88)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter.
> emit(StreamRecordWriter.java:86)
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.
> collect(RecordWriterOutput.java:72)
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.
> collect(RecordWriterOutput.java:39)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:797)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:775)
> at org.apache.flink.streaming.api.operators.StreamMap.
> processElement(StreamMap.java:38)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.
> processInput(StreamInputProcessor.java:185)
> - locked <0x00000007a980bac8> (a java.lang.Object)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
> OneInputStreamTask.java:63)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:272)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> at java.lang.Thread.run(Thread.java:745)
>
> "CloseableReaperThread" #477 daemon prio=5 os_prio=31
> tid=0x00007f9c9c014800 nid=0x10a1f in Object.wait() [0x000070000721e000]
>    java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
> - locked <0x00000007a9c017d0> (a java.lang.ref.ReferenceQueue$Lock)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
> at org.apache.flink.core.fs.SafetyNetCloseableRegistry$
> CloseableReaperThread.run(SafetyNetCloseableRegistry.java:145)
>
> "Cross Edges (4/4)" #476 daemon prio=5 os_prio=31 tid=0x00007f9c9d86a000
> nid=0x10413 in Object.wait() [0x000070000711b000]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at org.apache.flink.runtime.io.network.buffer.
> LocalBufferPool.requestBuffer(LocalBufferPool.java:168)
> - locked <0x00000007a9c0d600> (a java.util.ArrayDeque)
> at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.
> requestBufferBlocking(LocalBufferPool.java:138)
> at org.apache.flink.runtime.io.network.api.writer.
> RecordWriter.sendToTarget(RecordWriter.java:131)
> - locked <0x00000007a9c4bf48> (a org.apache.flink.runtime.io.
> network.api.serialization.SpanningRecordSerializer)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(
> RecordWriter.java:88)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter.
> emit(StreamRecordWriter.java:86)
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.
> collect(RecordWriterOutput.java:72)
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.
> collect(RecordWriterOutput.java:39)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:797)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:775)
> at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
> TimestampedCollector.java:51)
> ...
> at scala.collection.immutable.List.foreach(List.scala:381)
> ...
> at org.apache.flink.streaming.api.operators.StreamFlatMap.
> processElement(StreamFlatMap.java:47)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.
> processInput(StreamInputProcessor.java:185)
> - locked <0x00000007a9c0c3f0> (a java.lang.Object)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
> OneInputStreamTask.java:63)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:272)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> at java.lang.Thread.run(Thread.java:745)
>
> "CloseableReaperThread" #475 daemon prio=5 os_prio=31
> tid=0x00007f9c9c017800 nid=0x9e0f in Object.wait() [0x0000700007018000]
>    java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
> - locked <0x00000007aa200ff8> (a java.lang.ref.ReferenceQueue$Lock)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
> at org.apache.flink.core.fs.SafetyNetCloseableRegistry$
> CloseableReaperThread.run(SafetyNetCloseableRegistry.java:145)
>
> "Cross Edges (3/4)" #474 daemon prio=5 os_prio=31 tid=0x00007f9ca0938800
> nid=0xe013 in Object.wait() [0x0000700006f15000]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at org.apache.flink.runtime.io.network.buffer.
> LocalBufferPool.requestBuffer(LocalBufferPool.java:168)
> - locked <0x00000007aa20ce28> (a java.util.ArrayDeque)
> at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.
> requestBufferBlocking(LocalBufferPool.java:138)
> at org.apache.flink.runtime.io.network.api.writer.
> RecordWriter.sendToTarget(RecordWriter.java:131)
> - locked <0x00000007aa254c48> (a org.apache.flink.runtime.io.
> network.api.serialization.SpanningRecordSerializer)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(
> RecordWriter.java:88)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter.
> emit(StreamRecordWriter.java:86)
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.
> collect(RecordWriterOutput.java:72)
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.
> collect(RecordWriterOutput.java:39)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:797)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:775)
> at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
> TimestampedCollector.java:51)
> ...
> at scala.collection.immutable.List.foreach(List.scala:381)
> ...
> at org.apache.flink.streaming.api.operators.StreamFlatMap.
> processElement(StreamFlatMap.java:47)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.
> processInput(StreamInputProcessor.java:185)
> - locked <0x00000007aa20bc18> (a java.lang.Object)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
> OneInputStreamTask.java:63)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:272)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> at java.lang.Thread.run(Thread.java:745)
>
> "CloseableReaperThread" #473 daemon prio=5 os_prio=31
> tid=0x00007f9c9f03d800 nid=0x13517 in Object.wait() [0x0000700006e12000]
>    java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
> - locked <0x00000007aa401000> (a java.lang.ref.ReferenceQueue$Lock)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
> at org.apache.flink.core.fs.SafetyNetCloseableRegistry$
> CloseableReaperThread.run(SafetyNetCloseableRegistry.java:145)
>
> "Cross Edges (2/4)" #472 daemon prio=5 os_prio=31 tid=0x00007f9c9f03c800
> nid=0xc50f in Object.wait() [0x0000700006d0f000]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at org.apache.flink.runtime.io.network.buffer.
> LocalBufferPool.requestBuffer(LocalBufferPool.java:168)
> - locked <0x00000007aa40ce30> (a java.util.ArrayDeque)
> at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.
> requestBufferBlocking(LocalBufferPool.java:138)
> at org.apache.flink.runtime.io.network.api.writer.
> RecordWriter.sendToTarget(RecordWriter.java:131)
> - locked <0x00000007aa451428> (a org.apache.flink.runtime.io.
> network.api.serialization.SpanningRecordSerializer)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(
> RecordWriter.java:88)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter.
> emit(StreamRecordWriter.java:86)
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.
> collect(RecordWriterOutput.java:72)
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.
> collect(RecordWriterOutput.java:39)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:797)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:775)
> at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
> TimestampedCollector.java:51)
> ...
> at scala.collection.immutable.List.foreach(List.scala:381)
> ...
> at org.apache.flink.streaming.api.operators.StreamFlatMap.
> processElement(StreamFlatMap.java:47)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.
> processInput(StreamInputProcessor.java:185)
> - locked <0x00000007aa40bc20> (a java.lang.Object)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
> OneInputStreamTask.java:63)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:272)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> at java.lang.Thread.run(Thread.java:745)
>
> "CloseableReaperThread" #471 daemon prio=5 os_prio=31
> tid=0x00007f9c9c0cd000 nid=0xab17 in Object.wait() [0x0000700006c0c000]
>    java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
> - locked <0x00000007aa8017d0> (a java.lang.ref.ReferenceQueue$Lock)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
> at org.apache.flink.core.fs.SafetyNetCloseableRegistry$
> CloseableReaperThread.run(SafetyNetCloseableRegistry.java:145)
>
> "Cross Edges (1/4)" #470 daemon prio=5 os_prio=31 tid=0x00007f9c99f9d000
> nid=0xc40f in Object.wait() [0x0000700006b09000]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at org.apache.flink.runtime.io.network.buffer.
> LocalBufferPool.requestBuffer(LocalBufferPool.java:168)
> - locked <0x00000007aa80d600> (a java.util.ArrayDeque)
> at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.
> requestBufferBlocking(LocalBufferPool.java:138)
> at org.apache.flink.runtime.io.network.api.writer.
> RecordWriter.sendToTarget(RecordWriter.java:131)
> - locked <0x00000007aa914af0> (a org.apache.flink.runtime.io.
> network.api.serialization.SpanningRecordSerializer)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(
> RecordWriter.java:88)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter.
> emit(StreamRecordWriter.java:86)
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.
> collect(RecordWriterOutput.java:72)
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.
> collect(RecordWriterOutput.java:39)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:797)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:775)
> at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
> TimestampedCollector.java:51)
> ...
> at scala.collection.immutable.List.foreach(List.scala:381)
> ...
> at org.apache.flink.streaming.api.operators.StreamFlatMap.
> processElement(StreamFlatMap.java:47)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.
> processInput(StreamInputProcessor.java:185)
> - locked <0x00000007aa80c3f0> (a java.lang.Object)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
> OneInputStreamTask.java:63)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:272)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> at java.lang.Thread.run(Thread.java:745)
>
> "CloseableReaperThread" #469 daemon prio=5 os_prio=31
> tid=0x00007f9ca0e75800 nid=0x1002f in Object.wait() [0x0000700006a06000]
>    java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
> - locked <0x00000007aae0b7a0> (a java.lang.ref.ReferenceQueue$Lock)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
> at org.apache.flink.core.fs.SafetyNetCloseableRegistry$
> CloseableReaperThread.run(SafetyNetCloseableRegistry.java:145)
>
> "Split Reader: Custom File Source -> Parse JSON -> (Explode Assets, Self
> Edges) (4/4)" #468 daemon prio=5 os_prio=31 tid=0x00007f9c9c828000
> nid=0xa113 in Object.wait() [0x0000700006903000]
>    java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.Object.wait(Object.java:502)
> at org.apache.flink.streaming.api.functions.source.
> ContinuousFileReaderOperator.close(ContinuousFileReaderOperator.java:204)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> closeAllOperators(StreamTask.java:405)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:294)
> - locked <0x00000007aae16b00> (a java.lang.Object)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> at java.lang.Thread.run(Thread.java:745)
>
> "CloseableReaperThread" #467 daemon prio=5 os_prio=31
> tid=0x00007f9c9f80a000 nid=0xc613 in Object.wait() [0x0000700006800000]
>    java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
> - locked <0x00000007ab001968> (a java.lang.ref.ReferenceQueue$Lock)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
> at org.apache.flink.core.fs.SafetyNetCloseableRegistry$
> CloseableReaperThread.run(SafetyNetCloseableRegistry.java:145)
>
> "Split Reader: Custom File Source -> Parse JSON -> (Explode Assets, Self
> Edges) (3/4)" #466 daemon prio=5 os_prio=31 tid=0x00007f9c9f09e000
> nid=0xa013 in Object.wait() [0x00007000066fd000]
>    java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.Object.wait(Object.java:502)
> at org.apache.flink.streaming.api.functions.source.
> ContinuousFileReaderOperator.close(ContinuousFileReaderOperator.java:204)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> closeAllOperators(StreamTask.java:405)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:294)
> - locked <0x00000007ab00ccc8> (a java.lang.Object)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> at java.lang.Thread.run(Thread.java:745)
>
> "CloseableReaperThread" #465 daemon prio=5 os_prio=31
> tid=0x00007f9c9e0c9800 nid=0xe10f in Object.wait() [0x00007000065fa000]
>    java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
> - locked <0x00000007ab601960> (a java.lang.ref.ReferenceQueue$Lock)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
> at org.apache.flink.core.fs.SafetyNetCloseableRegistry$
> CloseableReaperThread.run(SafetyNetCloseableRegistry.java:145)
>
> "Split Reader: Custom File Source -> Parse JSON -> (Explode Assets, Self
> Edges) (2/4)" #464 daemon prio=5 os_prio=31 tid=0x00007f9c9e225000
> nid=0xdb13 in Object.wait() [0x00007000064f7000]
>    java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.Object.wait(Object.java:502)
> at org.apache.flink.streaming.api.functions.source.
> ContinuousFileReaderOperator.close(ContinuousFileReaderOperator.java:204)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> closeAllOperators(StreamTask.java:405)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:294)
> - locked <0x00000007ab60ccc0> (a java.lang.Object)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> at java.lang.Thread.run(Thread.java:745)
>
> "CloseableReaperThread" #463 daemon prio=5 os_prio=31
> tid=0x00007f9ca0809800 nid=0xd817 in Object.wait() [0x00007000063f4000]
>    java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
> - locked <0x00000007ab863008> (a java.lang.ref.ReferenceQueue$Lock)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
> at org.apache.flink.core.fs.SafetyNetCloseableRegistry$
> CloseableReaperThread.run(SafetyNetCloseableRegistry.java:145)
>
> "Split Reader: Custom File Source -> Parse JSON -> (Explode Assets, Self
> Edges) (1/4)" #462 daemon prio=5 os_prio=31 tid=0x00007f9ca0817800
> nid=0x9d17 in Object.wait() [0x00007000062f1000]
>    java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.Object.wait(Object.java:502)
> at org.apache.flink.streaming.api.functions.source.
> ContinuousFileReaderOperator.close(ContinuousFileReaderOperator.java:204)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> closeAllOperators(StreamTask.java:405)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:294)
> - locked <0x00000007ab86e5e8> (a java.lang.Object)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> at java.lang.Thread.run(Thread.java:745)
>
> "CloseableReaperThread" #461 daemon prio=5 os_prio=31
> tid=0x00007f9c9e0c6800 nid=0xb90f in Object.wait() [0x00007000061ee000]
>    java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
> - locked <0x00000007abe017f8> (a java.lang.ref.ReferenceQueue$Lock)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
> at org.apache.flink.core.fs.SafetyNetCloseableRegistry$
> CloseableReaperThread.run(SafetyNetCloseableRegistry.java:145)
>
> "IterationSource-10 (4/4)" #460 daemon prio=5 os_prio=31
> tid=0x00007f9c99fa2000 nid=0xb80f in Object.wait() [0x00007000060eb000]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at org.apache.flink.runtime.io.network.buffer.
> LocalBufferPool.requestBuffer(LocalBufferPool.java:168)
> - locked <0x00000007abe0d930> (a java.util.ArrayDeque)
> at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.
> requestBufferBlocking(LocalBufferPool.java:138)
> at org.apache.flink.runtime.io.network.api.writer.
> RecordWriter.sendToTarget(RecordWriter.java:131)
> - locked <0x00000007abe4f398> (a org.apache.flink.runtime.io.
> network.api.serialization.SpanningRecordSerializer)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(
> RecordWriter.java:88)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter.
> emit(StreamRecordWriter.java:86)
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.
> collect(RecordWriterOutput.java:72)
> at org.apache.flink.streaming.runtime.tasks.StreamIterationHead.run(
> StreamIterationHead.java:81)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:272)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> at java.lang.Thread.run(Thread.java:745)
>
> "CloseableReaperThread" #459 daemon prio=5 os_prio=31
> tid=0x00007f9c9f0a6000 nid=0x11a0f in Object.wait() [0x0000700005fe8000]
>    java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
> - locked <0x00000007ac22bfe8> (a java.lang.ref.ReferenceQueue$Lock)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
> at org.apache.flink.core.fs.SafetyNetCloseableRegistry$
> CloseableReaperThread.run(SafetyNetCloseableRegistry.java:145)
>
> "IterationSource-10 (3/4)" #458 daemon prio=5 os_prio=31
> tid=0x00007f9c9c0dd000 nid=0x1200f in Object.wait() [0x0000700005ee5000]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at org.apache.flink.runtime.io.network.buffer.
> LocalBufferPool.requestBuffer(LocalBufferPool.java:168)
> - locked <0x00000007ac238120> (a java.util.ArrayDeque)
> at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.
> requestBufferBlocking(LocalBufferPool.java:138)
> at org.apache.flink.runtime.io.network.api.writer.
> RecordWriter.sendToTarget(RecordWriter.java:131)
> - locked <0x00000007ac27c5e0> (a org.apache.flink.runtime.io.
> network.api.serialization.SpanningRecordSerializer)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(
> RecordWriter.java:88)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter.
> emit(StreamRecordWriter.java:86)
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.
> collect(RecordWriterOutput.java:72)
> at org.apache.flink.streaming.runtime.tasks.StreamIterationHead.run(
> StreamIterationHead.java:81)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:272)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> at java.lang.Thread.run(Thread.java:745)
>
> "CloseableReaperThread" #457 daemon prio=5 os_prio=31
> tid=0x00007f9c9c104000 nid=0x1181b in Object.wait() [0x0000700005de2000]
>    java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
> - locked <0x00000007ac4184d0> (a java.lang.ref.ReferenceQueue$Lock)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
> at org.apache.flink.core.fs.SafetyNetCloseableRegistry$
> CloseableReaperThread.run(SafetyNetCloseableRegistry.java:145)
>
> "IterationSource-10 (2/4)" #456 daemon prio=5 os_prio=31
> tid=0x00007f9c9d867000 nid=0xa61b in Object.wait() [0x0000700005cdf000]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at org.apache.flink.runtime.io.network.buffer.
> LocalBufferPool.requestBuffer(LocalBufferPool.java:168)
> - locked <0x00000007ac4269e0> (a java.util.ArrayDeque)
> at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.
> requestBufferBlocking(LocalBufferPool.java:138)
> at org.apache.flink.runtime.io.network.api.writer.
> RecordWriter.sendToTarget(RecordWriter.java:131)
> - locked <0x00000007ac46b850> (a org.apache.flink.runtime.io.
> network.api.serialization.SpanningRecordSerializer)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(
> RecordWriter.java:88)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter.
> emit(StreamRecordWriter.java:86)
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.
> collect(RecordWriterOutput.java:72)
> at org.apache.flink.streaming.runtime.tasks.StreamIterationHead.run(
> StreamIterationHead.java:81)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:272)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> at java.lang.Thread.run(Thread.java:745)
>
> "CloseableReaperThread" #455 daemon prio=5 os_prio=31
> tid=0x00007f9c9f80e800 nid=0x15e0f in Object.wait() [0x0000700005bdc000]
>    java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
> - locked <0x00000007ac801800> (a java.lang.ref.ReferenceQueue$Lock)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
> at org.apache.flink.core.fs.SafetyNetCloseableRegistry$
> CloseableReaperThread.run(SafetyNetCloseableRegistry.java:145)
>
> "IterationSource-10 (1/4)" #454 daemon prio=5 os_prio=31
> tid=0x00007f9c9cabd800 nid=0xb20f in Object.wait() [0x0000700005ad9000]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at org.apache.flink.runtime.io.network.buffer.
> LocalBufferPool.requestBuffer(LocalBufferPool.java:168)
> - locked <0x00000007ac80dcd8> (a java.util.ArrayDeque)
> at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.
> requestBufferBlocking(LocalBufferPool.java:138)
> at org.apache.flink.runtime.io.network.api.writer.
> RecordWriter.sendToTarget(RecordWriter.java:131)
> - locked <0x00000007ac873cd0> (a org.apache.flink.runtime.io.
> network.api.serialization.SpanningRecordSerializer)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(
> RecordWriter.java:88)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter.
> emit(StreamRecordWriter.java:86)
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.
> collect(RecordWriterOutput.java:72)
> at org.apache.flink.streaming.runtime.tasks.StreamIterationHead.run(
> StreamIterationHead.java:81)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:272)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> at java.lang.Thread.run(Thread.java:745)
>
> "Attach Listener" #403 daemon prio=9 os_prio=31 tid=0x00007f9c9c271800
> nid=0xfa3f waiting on condition [0x0000000000000000]
>    java.lang.Thread.State: RUNNABLE
>
> "flink-akka.actor.default-dispatcher-21" #402 daemon prio=5 os_prio=31
> tid=0x00007f9c99f9f800 nid=0x14597 waiting on condition [0x00007000056cd000]
>    java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00000005c01773a0> (a akka.dispatch.
> ForkJoinExecutorConfigurator$AkkaForkJoinPool)
> at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
>
> "flink-akka.actor.default-dispatcher-20" #401 daemon prio=5 os_prio=31
> tid=0x00007f9ca098c000 nid=0x13dab waiting on condition [0x00007000055ca000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00000005c01773a0> (a akka.dispatch.
> ForkJoinExecutorConfigurator$AkkaForkJoinPool)
> at scala.concurrent.forkjoin.ForkJoinPool.idleAwaitWork(
> ForkJoinPool.java:2135)
> at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2067)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
>
> "flink-akka.actor.default-dispatcher-19" #184 daemon prio=5 os_prio=31
> tid=0x00007f9ca0583800 nid=0xf807 waiting on condition [0x00007000053c4000]
>    java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00000005c01773a0> (a akka.dispatch.
> ForkJoinExecutorConfigurator$AkkaForkJoinPool)
> at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
>
> "flink-akka.actor.default-dispatcher-18" #183 daemon prio=5 os_prio=31
> tid=0x00007f9c9f8b9000 nid=0xde13 waiting on condition [0x00007000052c1000]
>    java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00000005c01773a0> (a akka.dispatch.
> ForkJoinExecutorConfigurator$AkkaForkJoinPool)
> at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
>
> "flink-akka.actor.default-dispatcher-17" #182 daemon prio=5 os_prio=31
> tid=0x00007f9ca013f800 nid=0xfc17 waiting on condition [0x00007000051be000]
>    java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00000005c01773a0> (a akka.dispatch.
> ForkJoinExecutorConfigurator$AkkaForkJoinPool)
> at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
>
> "flink-akka.actor.default-dispatcher-16" #175 daemon prio=5 os_prio=31
> tid=0x00007f9c9f8ea800 nid=0xec07 waiting on condition [0x00007000058d3000]
>    java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00000005c01773a0> (a akka.dispatch.
> ForkJoinExecutorConfigurator$AkkaForkJoinPool)
> at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
>
> "flink-akka.actor.default-dispatcher-15" #174 daemon prio=5 os_prio=31
> tid=0x00007f9c9f851000 nid=0xee07 waiting on condition [0x00007000054c7000]
>    java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00000005c01773a0> (a akka.dispatch.
> ForkJoinExecutorConfigurator$AkkaForkJoinPool)
> at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
>
> "Flink-MetricRegistry-1" #47 prio=5 os_prio=31 tid=0x00007f9c9f026000
> nid=0x1407 waiting on condition [0x00007000050bb000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x000000078fd306e8> (a java.util.concurrent.locks.
> AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer$
> ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(
> ScheduledThreadPoolExecutor.java:1093)
> at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(
> ScheduledThreadPoolExecutor.java:809)
> at java.util.concurrent.ThreadPoolExecutor.getTask(
> ThreadPoolExecutor.java:1067)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1127)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> "Timer-0" #46 daemon prio=5 os_prio=31 tid=0x00007f9c9c016800 nid=0x9803
> in Object.wait() [0x0000700004fb8000]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.util.TimerThread.mainLoop(Timer.java:552)
> - locked <0x000000078fdc3fd8> (a java.util.TaskQueue)
> at java.util.TimerThread.run(Timer.java:505)
>
> "Hashed wheel timer #1" #22 daemon prio=5 os_prio=31
> tid=0x00007f9c9b1fe000 nid=0x9603 waiting on condition [0x0000700004eb5000]
>    java.lang.Thread.State: TIMED_WAITING (sleeping)
> at java.lang.Thread.sleep(Native Method)
> at org.jboss.netty.util.HashedWheelTimer$Worker.waitForNextTick(
> HashedWheelTimer.java:483)
> at org.jboss.netty.util.HashedWheelTimer$Worker.run(
> HashedWheelTimer.java:392)
> at org.jboss.netty.util.ThreadRenamingRunnable.run(
> ThreadRenamingRunnable.java:108)
> at java.lang.Thread.run(Thread.java:745)
>
> "flink-akka.actor.default-dispatcher-14" #43 daemon prio=5 os_prio=31
> tid=0x00007f9c9c8b9000 nid=0x9403 waiting on condition [0x0000700004db2000]
>    java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00000005c01773a0> (a akka.dispatch.
> ForkJoinExecutorConfigurator$AkkaForkJoinPool)
> at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
>
> "IOManager reader thread #1" #41 daemon prio=5 os_prio=31
> tid=0x00007f9c9a47e000 nid=0x9203 waiting on condition [0x0000700004caf000]
>    java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x000000078feef2c0> (a java.util.concurrent.locks.
> AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer$
> ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> at java.util.concurrent.LinkedBlockingQueue.take(
> LinkedBlockingQueue.java:442)
> at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$
> ReaderThread.run(IOManagerAsync.java:380)
>
> "IOManager writer thread #1" #40 daemon prio=5 os_prio=31
> tid=0x00007f9c9b17c800 nid=0x9003 waiting on condition [0x0000700004bac000]
>    java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x000000078fd30b90> (a java.util.concurrent.locks.
> AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer$
> ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> at java.util.concurrent.LinkedBlockingQueue.take(
> LinkedBlockingQueue.java:442)
> at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$
> WriterThread.run(IOManagerAsync.java:486)
>
> "Flink KvStateServer EventLoop Thread 0" #27 daemon prio=5 os_prio=31
> tid=0x00007f9c9d26a800 nid=0x8e03 runnable [0x0000700004aa9000]
>    java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
> at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198)
> at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117)
> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
> - locked <0x00000005c007d2b0> (a io.netty.channel.nio.
> SelectedSelectionKeySet)
> - locked <0x00000005c007c4e0> (a java.util.Collections$UnmodifiableSet)
> - locked <0x00000005c007c400> (a sun.nio.ch.KQueueSelectorImpl)
> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> at io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:622)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:310)
> at io.netty.util.concurrent.SingleThreadEventExecutor$2.
> run(SingleThreadEventExecutor.java:111)
> at java.lang.Thread.run(Thread.java:745)
>
> "Flink Netty Server (0) Thread 0" #35 daemon prio=5 os_prio=31
> tid=0x00007f9c9d435800 nid=0x8c07 runnable [0x00007000049a6000]
>    java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
> at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198)
> at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117)
> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
> - locked <0x00000005c00c43d8> (a io.netty.channel.nio.
> SelectedSelectionKeySet)
> - locked <0x00000005c00c4320> (a java.util.Collections$UnmodifiableSet)
> - locked <0x00000005c00c4240> (a sun.nio.ch.KQueueSelectorImpl)
> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> at io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:622)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:310)
> at io.netty.util.concurrent.SingleThreadEventExecutor$2.
> run(SingleThreadEventExecutor.java:111)
> at java.lang.Thread.run(Thread.java:745)
>
> "New I/O server boss #6" #26 daemon prio=5 os_prio=31
> tid=0x00007f9c9a46e000 nid=0x8a03 runnable [0x00007000048a3000]
>    java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
> at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198)
> at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117)
> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
> - locked <0x00000005c01770e0> (a sun.nio.ch.Util$3)
> - locked <0x00000005c01770f0> (a java.util.Collections$UnmodifiableSet)
> - locked <0x00000005c0177090> (a sun.nio.ch.KQueueSelectorImpl)
> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:101)
> at org.jboss.netty.channel.socket.nio.NioServerBoss.
> select(NioServerBoss.java:163)
> at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(
> AbstractNioSelector.java:212)
> at org.jboss.netty.channel.socket.nio.NioServerBoss.run(
> NioServerBoss.java:42)
> at org.jboss.netty.util.ThreadRenamingRunnable.run(
> ThreadRenamingRunnable.java:108)
> at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(
> DeadLockProofWorker.java:42)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> "New I/O worker #5" #25 daemon prio=5 os_prio=31 tid=0x00007f9c9d2b6000
> nid=0x8803 runnable [0x00007000047a0000]
>    java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
> at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198)
> at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117)
> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
> - locked <0x00000005c01f4aa0> (a sun.nio.ch.Util$3)
> - locked <0x00000005c01f4ab0> (a java.util.Collections$UnmodifiableSet)
> - locked <0x00000005c01f4a50> (a sun.nio.ch.KQueueSelectorImpl)
> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> at org.jboss.netty.channel.socket.nio.SelectorUtil.
> select(SelectorUtil.java:68)
> at org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(
> AbstractNioSelector.java:415)
> at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(
> AbstractNioSelector.java:212)
> at org.jboss.netty.channel.socket.nio.AbstractNioWorker.
> run(AbstractNioWorker.java:89)
> at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
> at org.jboss.netty.util.ThreadRenamingRunnable.run(
> ThreadRenamingRunnable.java:108)
> at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(
> DeadLockProofWorker.java:42)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> "New I/O worker #4" #24 daemon prio=5 os_prio=31 tid=0x00007f9c9d2d2000
> nid=0x8603 runnable [0x000070000469d000]
>    java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
> at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198)
> at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117)
> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
> - locked <0x00000005c016e278> (a sun.nio.ch.Util$3)
> - locked <0x00000005c016e288> (a java.util.Collections$UnmodifiableSet)
> - locked <0x00000005c016e228> (a sun.nio.ch.KQueueSelectorImpl)
> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> at org.jboss.netty.channel.socket.nio.SelectorUtil.
> select(SelectorUtil.java:68)
> at org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(
> AbstractNioSelector.java:415)
> at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(
> AbstractNioSelector.java:212)
> at org.jboss.netty.channel.socket.nio.AbstractNioWorker.
> run(AbstractNioWorker.java:89)
> at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
> at org.jboss.netty.util.ThreadRenamingRunnable.run(
> ThreadRenamingRunnable.java:108)
> at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(
> DeadLockProofWorker.java:42)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> "New I/O boss #3" #23 daemon prio=5 os_prio=31 tid=0x00007f9c9d2d1000
> nid=0x8403 runnable [0x000070000459a000]
>    java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
> at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198)
> at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117)
> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
> - locked <0x00000005c017e048> (a sun.nio.ch.Util$3)
> - locked <0x00000005c017e058> (a java.util.Collections$UnmodifiableSet)
> - locked <0x00000005c017dff8> (a sun.nio.ch.KQueueSelectorImpl)
> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> at org.jboss.netty.channel.socket.nio.SelectorUtil.
> select(SelectorUtil.java:68)
> at org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(
> AbstractNioSelector.java:415)
> at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(
> AbstractNioSelector.java:212)
> at org.jboss.netty.channel.socket.nio.NioClientBoss.run(
> NioClientBoss.java:42)
> at org.jboss.netty.util.ThreadRenamingRunnable.run(
> ThreadRenamingRunnable.java:108)
> at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(
> DeadLockProofWorker.java:42)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> "New I/O worker #2" #21 daemon prio=5 os_prio=31 tid=0x00007f9c9d2b0000
> nid=0x8203 runnable [0x0000700004497000]
>    java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
> at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198)
> at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117)
> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
> - locked <0x00000005c0180070> (a sun.nio.ch.Util$3)
> - locked <0x00000005c0180080> (a java.util.Collections$UnmodifiableSet)
> - locked <0x00000005c0180020> (a sun.nio.ch.KQueueSelectorImpl)
> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> at org.jboss.netty.channel.socket.nio.SelectorUtil.
> select(SelectorUtil.java:68)
> at org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(
> AbstractNioSelector.java:415)
> at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(
> AbstractNioSelector.java:212)
> at org.jboss.netty.channel.socket.nio.AbstractNioWorker.
> run(AbstractNioWorker.java:89)
> at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
> at org.jboss.netty.util.ThreadRenamingRunnable.run(
> ThreadRenamingRunnable.java:108)
> at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(
> DeadLockProofWorker.java:42)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> "New I/O worker #1" #20 daemon prio=5 os_prio=31 tid=0x00007f9c9d2af800
> nid=0x8003 runnable [0x0000700004394000]
>    java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
> at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198)
> at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117)
> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
> - locked <0x00000005c01605c0> (a sun.nio.ch.Util$3)
> - locked <0x00000005c01605d0> (a java.util.Collections$UnmodifiableSet)
> - locked <0x00000005c0160570> (a sun.nio.ch.KQueueSelectorImpl)
> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> at org.jboss.netty.channel.socket.nio.SelectorUtil.
> select(SelectorUtil.java:68)
> at org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(
> AbstractNioSelector.java:415)
> at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(
> AbstractNioSelector.java:212)
> at org.jboss.netty.channel.socket.nio.AbstractNioWorker.
> run(AbstractNioWorker.java:89)
> at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
> at org.jboss.netty.util.ThreadRenamingRunnable.run(
> ThreadRenamingRunnable.java:108)
> at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(
> DeadLockProofWorker.java:42)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> "flink-akka.remote.default-remote-dispatcher-6" #19 daemon prio=5
> os_prio=31 tid=0x00007f9c9d29d800 nid=0x7e03 waiting on condition
> [0x0000700004291000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00000005c0160b80> (a akka.dispatch.
> ForkJoinExecutorConfigurator$AkkaForkJoinPool)
> at scala.concurrent.forkjoin.ForkJoinPool.idleAwaitWork(
> ForkJoinPool.java:2135)
> at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2067)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
>
> "flink-akka.remote.default-remote-dispatcher-5" #18 daemon prio=5
> os_prio=31 tid=0x00007f9c9b111000 nid=0x7c03 waiting on condition
> [0x000070000418e000]
>    java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00000005c0160b80> (a akka.dispatch.
> ForkJoinExecutorConfigurator$AkkaForkJoinPool)
> at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
>
> "flink-akka.actor.default-dispatcher-4" #17 daemon prio=5 os_prio=31
> tid=0x00007f9c9d299000 nid=0x7a03 waiting on condition [0x000070000408b000]
>    java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00000005c01773a0> (a akka.dispatch.
> ForkJoinExecutorConfigurator$AkkaForkJoinPool)
> at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
>
> "flink-akka.actor.default-dispatcher-3" #16 daemon prio=5 os_prio=31
> tid=0x00007f9c9a45a000 nid=0x7803 waiting on condition [0x0000700003f88000]
>    java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00000005c01773a0> (a akka.dispatch.
> ForkJoinExecutorConfigurator$AkkaForkJoinPool)
> at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
>
> "flink-akka.actor.default-dispatcher-2" #15 daemon prio=5 os_prio=31
> tid=0x00007f9c9d298000 nid=0x7603 waiting on condition [0x0000700003e85000]
>    java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00000005c01773a0> (a akka.dispatch.
> ForkJoinExecutorConfigurator$AkkaForkJoinPool)
> at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
>
> "flink-scheduler-1" #14 daemon prio=5 os_prio=31 tid=0x00007f9c9ab1b000
> nid=0x7407 waiting on condition [0x0000700003d82000]
>    java.lang.Thread.State: TIMED_WAITING (sleeping)
> at java.lang.Thread.sleep(Native Method)
> at akka.actor.LightArrayRevolverScheduler.waitNanos(Scheduler.scala:231)
> at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.
> scala:411)
> at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
> at java.lang.Thread.run(Thread.java:745)
>
> "Service Thread" #10 daemon prio=9 os_prio=31 tid=0x00007f9c9b037800
> nid=0x6d03 runnable [0x0000000000000000]
>    java.lang.Thread.State: RUNNABLE
>
> "C1 CompilerThread3" #9 daemon prio=9 os_prio=31 tid=0x00007f9c9a120800
> nid=0x6b03 waiting on condition [0x0000000000000000]
>    java.lang.Thread.State: RUNNABLE
>
> "C2 CompilerThread2" #8 daemon prio=9 os_prio=31 tid=0x00007f9c9a11f800
> nid=0x6903 waiting on condition [0x0000000000000000]
>    java.lang.Thread.State: RUNNABLE
>
> "C2 CompilerThread1" #7 daemon prio=9 os_prio=31 tid=0x00007f9c99f72800
> nid=0x6703 waiting on condition [0x0000000000000000]
>    java.lang.Thread.State: RUNNABLE
>
> "C2 CompilerThread0" #6 daemon prio=9 os_prio=31 tid=0x00007f9c99f72000
> nid=0x6503 waiting on condition [0x0000000000000000]
>    java.lang.Thread.State: RUNNABLE
>
> "Signal Dispatcher" #5 daemon prio=9 os_prio=31 tid=0x00007f9c99f71000
> nid=0x6303 runnable [0x0000000000000000]
>    java.lang.Thread.State: RUNNABLE
>
> "Surrogate Locker Thread (Concurrent GC)" #4 daemon prio=9 os_prio=31
> tid=0x00007f9c9a11f000 nid=0x6003 waiting on condition [0x0000000000000000]
>    java.lang.Thread.State: RUNNABLE
>
> "Finalizer" #3 daemon prio=8 os_prio=31 tid=0x00007f9c9b035800 nid=0x5103
> in Object.wait() [0x0000700003467000]
>    java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
> - locked <0x00000005c0177830> (a java.lang.ref.ReferenceQueue$Lock)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
> at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)
>
> "Reference Handler" #2 daemon prio=10 os_prio=31 tid=0x00007f9c99f63000
> nid=0x4f03 in Object.wait() [0x0000700003364000]
>    java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.Object.wait(Object.java:502)
> at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
> - locked <0x00000005c0177820> (a java.lang.ref.Reference$Lock)
> at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)
>
> "main" #1 prio=5 os_prio=31 tid=0x00007f9c99808800 nid=0x1c03 waiting on
> condition [0x0000700001d21000]
>    java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00000005c02ebd98> (a java.util.concurrent.
> CountDownLatch$Sync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer.
> parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer.
> doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer.
> acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
> at akka.actor.ActorSystemImpl$TerminationCallbacks.ready(
> ActorSystem.scala:819)
> at akka.actor.ActorSystemImpl$TerminationCallbacks.ready(
> ActorSystem.scala:788)
> at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:169)
> at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:169)
> at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(
> BlockContext.scala:53)
> at scala.concurrent.Await$.ready(package.scala:169)
> at akka.actor.ActorSystemImpl.awaitTermination(ActorSystem.scala:644)
> at akka.actor.ActorSystemImpl.awaitTermination(ActorSystem.scala:645)
> at org.apache.flink.runtime.taskmanager.TaskManager$.
> runTaskManager(TaskManager.scala:1805)
> at org.apache.flink.runtime.taskmanager.TaskManager$.
> selectNetworkInterfaceAndRunTaskManager(TaskManager.scala:1639)
> at org.apache.flink.runtime.taskmanager.TaskManager$$anon$
> 2.call(TaskManager.scala:1548)
> at org.apache.flink.runtime.taskmanager.TaskManager$$anon$
> 2.call(TaskManager.scala:1546)
> at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(
> HadoopSecurityContext.java:43)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1657)
> at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(
> HadoopSecurityContext.java:40)
> at org.apache.flink.runtime.taskmanager.TaskManager$.main(
> TaskManager.scala:1546)
> at org.apache.flink.runtime.taskmanager.TaskManager.main(
> TaskManager.scala)
>
> "VM Thread" os_prio=31 tid=0x00007f9c9a108800 nid=0x4d03 runnable
>
> "Gang worker#0 (Parallel GC Threads)" os_prio=31 tid=0x00007f9c9b003800
> nid=0x2503 runnable
>
> "Gang worker#1 (Parallel GC Threads)" os_prio=31 tid=0x00007f9c9b004000
> nid=0x2703 runnable
>
> "Gang worker#2 (Parallel GC Threads)" os_prio=31 tid=0x00007f9c9a802000
> nid=0x2903 runnable
>
> "Gang worker#3 (Parallel GC Threads)" os_prio=31 tid=0x00007f9c9b005000
> nid=0x2b03 runnable
>
> "Gang worker#4 (Parallel GC Threads)" os_prio=31 tid=0x00007f9c9b005800
> nid=0x2d03 runnable
>
> "Gang worker#5 (Parallel GC Threads)" os_prio=31 tid=0x00007f9c9b006000
> nid=0x2f03 runnable
>
> "Gang worker#6 (Parallel GC Threads)" os_prio=31 tid=0x00007f9c9b006800
> nid=0x3103 runnable
>
> "Gang worker#7 (Parallel GC Threads)" os_prio=31 tid=0x00007f9c9a802800
> nid=0x3303 runnable
>
> "G1 Main Concurrent Mark GC Thread" os_prio=31 tid=0x00007f9c9b011000
> nid=0x4703 runnable
>
> "Gang worker#0 (G1 Parallel Marking Threads)" os_prio=31
> tid=0x00007f9c9a008000 nid=0x4903 runnable
>
> "Gang worker#1 (G1 Parallel Marking Threads)" os_prio=31
> tid=0x00007f9c9b011800 nid=0x4b03 runnable
>
> "G1 Concurrent Refinement Thread#0" os_prio=31 tid=0x00007f9c9a808800
> nid=0x4503 runnable
>
> "G1 Concurrent Refinement Thread#1" os_prio=31 tid=0x00007f9c9a808000
> nid=0x4303 runnable
>
> "G1 Concurrent Refinement Thread#2" os_prio=31 tid=0x00007f9c9a807000
> nid=0x4103 runnable
>
> "G1 Concurrent Refinement Thread#3" os_prio=31 tid=0x00007f9c9a806800
> nid=0x3f03 runnable
>
> "G1 Concurrent Refinement Thread#4" os_prio=31 tid=0x00007f9c9a805800
> nid=0x3d03 runnable
>
> "G1 Concurrent Refinement Thread#5" os_prio=31 tid=0x00007f9c9a805000
> nid=0x3b03 runnable
>
> "G1 Concurrent Refinement Thread#6" os_prio=31 tid=0x00007f9c9a804000
> nid=0x3903 runnable
>
> "G1 Concurrent Refinement Thread#7" os_prio=31 tid=0x00007f9c99811800
> nid=0x3703 runnable
>
> "G1 Concurrent Refinement Thread#8" os_prio=31 tid=0x00007f9c9a803800
> nid=0x3503 runnable
>
> "VM Periodic Task Thread" os_prio=31 tid=0x00007f9c9a115800 nid=0x6f03
> waiting on condition
>
> JNI global references: 330
>
>
> Best Regards
> Andrey
>
>
>

Mime
View raw message