flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Nico Kruber (JIRA)" <j...@apache.org>
Subject [jira] [Closed] (FLINK-9242) LocalEnvironment - Operator threads stuck on java.lang.Thread.State: WAITING
Date Tue, 24 Apr 2018 11:14:00 GMT

     [ https://issues.apache.org/jira/browse/FLINK-9242?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Nico Kruber closed FLINK-9242.
------------------------------
    Resolution: Duplicate

I suspect this is a duplicate of FLINK-9144 for which a fix was just merged April 19 which
fixes backlog counting while spilling to disk: the operator was creating a too-high backlog
counter so the following operator probably waits for data which is simply not there.

I assume (from your original mailing list post from April 15) that you may have been testing
without that fix. Can you please verify that your problem still exists with the latest snapshot
version? If you already tested with a more recent snapshot, please feel free to re-open this
issue.

> LocalEnvironment - Operator threads stuck on java.lang.Thread.State: WAITING
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-9242
>                 URL: https://issues.apache.org/jira/browse/FLINK-9242
>             Project: Flink
>          Issue Type: Bug
>          Components: Cluster Management
>    Affects Versions: 1.5.0, 1.6.0
>         Environment: *SETUP 1*
> - Windows 7 Pro x64
> - Java 1.8.0_162 x64
> - 8 GB RAM
> - Intel i7 620M
> *SETUP 2*
> - Slackware 14.2 x64 GNU/Linux 4.4.88
> - Java openjdk version "1.8.0_151"
> OpenJDK Runtime Environment (IcedTea 3.6.0) (Slackware)
> OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)
> - 256 GB RAM
> - 8x Intel(R) Xeon(R) CPU E7- 4830
>            Reporter: Miguel E. Coimbra
>            Priority: Major
>         Attachments: flink_debugging.PNG
>
>
> Hello,
> As per Fabian Hueske's advice on the mailing list, I am detailing the problem here.
>  This happens on my code in both 1.5-SNAPSHOT and 1.6-SNAPSHOT but not on 1.4.2 (stable).
>  I believe it might be some sort of regression which was introduced post 1.4.2.
> I'm getting different DataSet operators blocked on java.lang.Thread.State: WAITING for
no apparent reason.
>  I only tested this using a LocalEnvironment which is created like so:
> {code:java}
> final Configuration conf = new Configuration();
> conf.setString("web.log.path", logPath);
> conf.setString("jobmanager.rpc.address", "127.0.0.1");
> conf.setString("web.port", "8081-9000");
> conf.setString("query.server.ports", "2000-30000");
> conf.setString("query.proxy.ports", "30001-60000");
> LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
> {code}
> (also tried creating the LocalEnvironment without the web interface and it also happens)
> I have debugged with IntelliJ IDEA and obtained thread dumps from different executions,
and realized quite a few operator threads are stuck on java.lang.Thread.State: WAITING.
> I cannot share my code at the moment, but essentially I have a series of jobs and some
use common data (I made sure it was written to disk in job _i_ and read back from disk in
job _i + 1_)
> There are three major threads that I find to be in this waiting state.
> I'm running on local mode with a parallelism of one.
>  The thread dumps I obtained show me where the wait calls originated:
>  
> {code:java}
> Number 1:
> "CHAIN Join (Join at selectEdges(GraphUtils.java:328)) -> Combine (Distinct at selectEdges(GraphUtils.java:330))
(1/1)@9158" prio=5 tid=0xd93 nid=NA waiting
>   java.lang.Thread.State: WAITING
>       at java.lang.Object.wait(Object.java:-1)
>       at java.lang.Object.wait(Object.java:502)
>       at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
>       at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
>       at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
>       at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>       at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>       at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
>       at org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
>       at org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
>       at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
>       at org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122)
>       at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
>       at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>       at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>       at java.lang.Thread.run(Thread.java:748)
> {code}
> {code:java}
> Number 2:
> "Join (Join at summaryGraph(SummaryGraphBuilder.java:92)) (1/1)@9153" prio=5 tid=0xd8e
nid=NA waiting
>   java.lang.Thread.State: WAITING
>       at java.lang.Object.wait(Object.java:-1)
>       at java.lang.Object.wait(Object.java:502)
>       at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
>       at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
>       at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
>       at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>       at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>       at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
>       at org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
>       at org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
>       at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
>       at org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122)
>       at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
>       at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>       at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>       at java.lang.Thread.run(Thread.java:748)
> {code}
> {code:java}
> Number 3:
> "Join (Join at selectEdges(GraphUtils.java:324)) (1/1)@9118" prio=5 tid=0xd75 nid=NA
waiting
>   java.lang.Thread.State: WAITING
>       at java.lang.Object.wait(Object.java:-1)
>       at java.lang.Object.wait(Object.java:502)
>       at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
>       at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
>       at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
>       at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>       at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>       at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
>       at org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
>       at org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
>       at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
>       at org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashJoinIterator.callWithNextKey(ReusingBuildFirstHashJoinIterator.java:123)
>       at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
>       at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>       at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>       at java.lang.Thread.run(Thread.java:748){code}
>  
> While I realize these dumps on their own may not be helpful, they at least (as far as
I know) indicate that the threads are all waiting on something.
>  But if it was resource scarcity I believe the program would terminate with an exception.
>  And if it was garbage collection activity, I believe the JVM process would not be at
0% CPU usage.
> I realize I didn't provide the user-code code that generates the execution plan for Flink
which led to the contexts in which the threads are waiting, my apologies. I will do so as
soon a I get a chance.
> To highlight the symptoms:
>  - The memory assigned to the JVM is fully used, but there are no exceptions about lack
of memory (and the system had plenty more memory available).
>  - The CPU usage is at 0% and all threads are all in a waiting state, but I don't understand
what signal they're waiting for exactly.
> I noticed something suspicious as well: I have chains of operators where the first operator
will ingest the expected amount of records but will not emit any, leaving the following operator
empty in a "RUNNING" state (see attached image).
> I think we may consider there is some complexity in my scenario, at least when compared
to samples in the Flink documentation. When visualizing the job plan, it is necessary to zoom
in and out to check on specific parts of the execution scheme.
> Among the sequence of operations, I am:
> 1 - Creating a DataSet
> 2 - Using it as an initial workset in a DeltaIteration
> 2.1 - Joining the workset on each iteration with the edges of a graph
> 3 - Using the final solution set resulting from the DeltaIteration to build a graph and
execute an algorithm over it (.run method).
> - The graph is not prohibitively big and I have a very low limit on the number of iterations
(at most 4 or 5).
>  I will add more information as soon as it is available.
> It seems, however, that there is some sort of lack of synchronization occurring and perhaps
the operators _become isolated_?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message