flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Flink 0.6 hang up
Date Thu, 04 Sep 2014 16:49:10 GMT
The above suggested solution would work for the SuperstepKickoffLatch, but
unfortunately not for the other broker structures.

So the problem does persist in other cases.

A correct solution would be to prevent backpressure on forking flows, which
will come as part of the network-stack rewrite. That solves it properly, I
would prefer to go for that solution.

A temporary workaround would be the following:
 - Find the data sets that are consumed both inside the iteration and
outside the iteration. Those are typically preprocesses matrices or so.
 - Duplicate that code to actually have two different subprograms
(producing different data sets) for that.
 - Use a different data set inside the iteration and outside the iteration.

Stephan




On Thu, Sep 4, 2014 at 6:14 PM, Márton Balassi <balassi.marton@gmail.com>
wrote:

> Thanks, Ufuk found the relevant part in the stacktrace:
>
> "Join(Sends the rows of p with multiple keys)) (1/1)" daemon prio=10
> tid=0x00007f8928014800 nid=0x998 waiting on condition [0x00007f8912eed000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00000007d2668ea0> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
>         at
>
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
>         at
> java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:374)
>         at
> org.apache.flink.runtime.iterative.concurrent.Broker.get(Broker.java:63)
>         at
>
> org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask.run(IterationIntermediatePactTask.java:84)
>         at
>
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:375)
>         at
>
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:265)
>         at java.lang.Thread.run(Thread.java:744)
>
> This part waits for the iteration head which has not been started yet and
> thus induces a deadlock.
>
> Opened a JIRA issue on it:
> https://issues.apache.org/jira/browse/FLINK-1088
>
> Thanks for the quick response by the way!
>
>
> On Thu, Sep 4, 2014 at 5:39 PM, Fabian Hueske <fhueske@apache.org> wrote:
>
> > Hi Marton,
> >
> > a jstack Java stacktrace can help to identify where the code got stuck.
> > Can you open a JIRA and post a stacktrace there?
> >
> > Cheers, Fabian
> >
> >
> > 2014-09-04 17:25 GMT+02:00 Márton Balassi <balassi.marton@gmail.com>:
> >
> > > CPU load:
> > >
> > > Tested on my 4-core machine the CPU load spikes up at the beginning of
> > the
> > > job and stays relatively high during the whole job when run with
> version
> > > 0.5, then finishes gracefully. On version 0.6 it works seemingly well
> > until
> > > the hangup. Interestingly enough even when no more log messages appear
> my
> > > CPU utilization stays 10-15% higher per core then without running the
> > job.
> > >
> > > logs:
> > >
> > > For both the implementation it starts like this:
> > >
> > > 09/04/2014 17:05:51: Job execution switched to status SCHEDULED
> > > 09/04/2014 17:05:51: DataSource (CSV Input (|)
> > > /home/mbalassi/git/als-comparison/data/sampledb2b.csv.txt) (1/1)
> switched
> > > to SCHEDULED
> > > 09/04/2014 17:05:51: Reduce(Create q as a random matrix) (1/1) switched
> > to
> > > SCHEDULED
> > > 09/04/2014 17:05:51: PartialSolution (BulkIteration (Bulk Iteration))
> > (1/1)
> > > switched to SCHEDULED
> > > 09/04/2014 17:05:51: Join(Sends the columns of q with multiple keys)
> > (1/1)
> > > switched to SCHEDULED
> > > 09/04/2014 17:05:51: CoGroup (For fixed q calculates optimal p) (1/1)
> > > switched to SCHEDULED
> > > 09/04/2014 17:05:51: Join(Sends the rows of p with multiple keys))
> (1/1)
> > > switched to SCHEDULED
> > > 09/04/2014 17:05:51: CoGroup (For fixed p calculates optimal q) (1/1)
> > > switched to SCHEDULED
> > > 09/04/2014 17:05:51: Fake Tail (1/1) switched to SCHEDULED
> > > 09/04/2014 17:05:51: Join(Sends the columns of q with multiple keys)
> > (1/1)
> > > switched to SCHEDULED
> > > 09/04/2014 17:05:51: CoGroup (For fixed q calculates optimal p) (1/1)
> > > switched to SCHEDULED
> > >
> > > [Omitted quite some healthy messages...]
> > >
> > > 09/04/2014 17:05:53: Join(Sends the rows of p with multiple keys))
> (1/1)
> > > switched to READY
> > > 09/04/2014 17:05:53: Join(Sends the rows of p with multiple keys))
> (1/1)
> > > switched to STARTING
> > > 09/04/2014 17:05:53: Join(Sends the rows of p with multiple keys))
> (1/1)
> > > switched to RUNNING
> > > 09/04/2014 17:05:53: CoGroup (For fixed p calculates optimal q) (1/1)
> > > switched to READY
> > > 09/04/2014 17:05:53: Fake Tail (1/1) switched to READY
> > > 09/04/2014 17:05:53: CoGroup (For fixed p calculates optimal q) (1/1)
> > > switched to STARTING
> > > 09/04/2014 17:05:53: Fake Tail (1/1) switched to STARTING
> > > 09/04/2014 17:05:54: CoGroup (For fixed p calculates optimal q) (1/1)
> > > switched to RUNNING
> > > 09/04/2014 17:05:54: Fake Tail (1/1) switched to RUNNING
> > > 09/04/2014 17:05:54: Join(Sends the columns of q with multiple keys)
> > (1/1)
> > > switched to READY
> > > 09/04/2014 17:05:54: Join(Sends the columns of q with multiple keys)
> > (1/1)
> > > switched to STARTING
> > > 09/04/2014 17:05:54: Join(Sends the columns of q with multiple keys)
> > (1/1)
> > > switched to RUNNING
> > > 09/04/2014 17:05:54: CoGroup (For fixed q calculates optimal p) (1/1)
> > > switched to READY
> > > 09/04/2014 17:05:54: CoGroup (For fixed q calculates optimal p) (1/1)
> > > switched to STARTING
> > > 09/04/2014 17:05:55: CoGroup (For fixed q calculates optimal p) (1/1)
> > > switched to RUNNING
> > >
> > > Flink stops here, Strato continues:
> > >
> > > 09/04/2014 17:09:01: DataSource(CSV Input (|)) (1/1) switched to
> > FINISHING
> > > 09/04/2014 17:09:02: PartialSolution (BulkIteration (Bulk Iteration))
> > (1/1)
> > > switched to READY
> > > 09/04/2014 17:09:02: PartialSolution (BulkIteration (Bulk Iteration))
> > (1/1)
> > > switched to STARTING
> > > 09/04/2014 17:09:02: PartialSolution (BulkIteration (Bulk Iteration))
> > (1/1)
> > > switched to RUNNING
> > > 09/04/2014 17:09:03: Reduce(Create q as a random matrix) (1/1) switched
> > to
> > > FINISHING
> > > 09/04/2014 17:09:05: Sync(BulkIteration (Bulk Iteration)) (1/1)
> switched
> > to
> > > READY
> > > 09/04/2014 17:09:05: Sync(BulkIteration (Bulk Iteration)) (1/1)
> switched
> > to
> > > STARTING
> > > 09/04/2014 17:09:05: Sync(BulkIteration (Bulk Iteration)) (1/1)
> switched
> > to
> > > RUNNING
> > > 09/04/2014 17:09:09: Sync(BulkIteration (Bulk Iteration)) (1/1)
> switched
> > to
> > > FINISHING
> > > 09/04/2014 17:09:09:
> > >
> > >
> >
> DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@7ea742a1
> > > )
> > > (1/1) switched to READY
> > > 09/04/2014 17:09:09:
> > >
> > >
> >
> DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@7ea742a1
> > > )
> > > (1/1) switched to STARTING
> > >
> > > [Omitted quite some healthy messages...]
> > >
> > > 09/04/2014 17:09:10: PartialSolution (BulkIteration (Bulk Iteration))
> > (1/1)
> > > switched to FINISHED
> > > 09/04/2014 17:09:10: CoGroup(For fixed p calculates optimal q) (1/1)
> > > switched to FINISHED
> > > 09/04/2014 17:09:10:
> > >
> > >
> >
> DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@5dcde3f3
> > > )
> > > (1/1) switched to RUNNING
> > > 09/04/2014 17:09:10: CoGroup(For fixed q calculates optimal p) (1/1)
> > > switched to FINISHING
> > > 09/04/2014 17:09:10:
> > >
> > >
> >
> DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@5dcde3f3
> > > )
> > > (1/1) switched to FINISHING
> > > 09/04/2014 17:09:11: Join(Sends the columns of q with multiple keys)
> > (1/1)
> > > switched to FINISHED
> > > 09/04/2014 17:09:11: CoGroup(For fixed q calculates optimal p) (1/1)
> > > switched to FINISHED
> > > 09/04/2014 17:09:11:
> > >
> > >
> >
> DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@5dcde3f3
> > > )
> > > (1/1) switched to FINISHED
> > > 09/04/2014 17:09:11: Job execution switched to status FINISHED
> > >
> > >
> > >
> > >
> > >
> > > On Thu, Sep 4, 2014 at 3:33 PM, Ufuk Celebi <uce@apache.org> wrote:
> > >
> > > > Hey Marton,
> > > >
> > > > thanks for reporting the issue and the link to the repo to reproduce
> > the
> > > > problem. I will look into it later today.
> > > >
> > > > If you like, you could provide some more information in the meantime:
> > > >
> > > > - How the CPU load?
> > > > - What are TM logs saying?
> > > > - Can you give a stack trace? Where is it hanging?
> > > >
> > > >
> > > >
> > > > On Thu, Sep 4, 2014 at 3:14 PM, Márton Balassi <
> > balassi.marton@gmail.com
> > > >
> > > > wrote:
> > > >
> > > > > Hey,
> > > > >
> > > > > We managed to produce a code, for which the legacy Stratophere 0.5
> > > > release
> > > > > implementation works nicely, however the updated Flink 0.6 release
> > > > > implementation hangs up for slightly larger inputs.
> > > > >
> > > > >
> > > > > Please check out the issue here:
> > > > > https://github.com/mbalassi/als-comparison
> > > > >
> > > > > Any suggestions are welcome.
> > > > >
> > > > > Cheers,
> > > > >
> > > > > Marton
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message