flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: [Gelly]Distributed Minimum Spanning Tree Example
Date Sat, 14 Feb 2015 20:17:55 GMT
Hi Andra,

I haven't had a detailed look at Gelly and its functions, but Flink has
only few operators which can cause undeterministic behavior.
In general, user code should be implemented without side effects, i.e., the
result of each function call may only depend on its arguments. This
principle gives Flink the freedom to perform function calls in any order on
any machine.
The only build-in exception is the mapPartition operator, which receives a
whole partition as input where the partitions are not deterministically
computed and depend on input split assignment and optimizer strategy
choices.
Another source of undeterministic results can be incorrect semantic
properties, which can make the optimizer believe that data is already
sorted or partitioned while it is not. In case of your example, an explicit
rebalance operation could reset these believed properties and force the
optimizer to deterministically reorganize the data.

I would have a look at the execution plans for both variants (the
undeterministic and the deterministic).
You can get them as JSON String by calling
ExecutionEnvirionment.getExecutionPlan().

Best, Fabian



For example, the result of a mapPartition() operator can depen


2015-02-14 18:55 GMT+01:00 Andra Lungu <lungu.andra@gmail.com>:

> Hey guys,
>
> As I previously said, I have had some problems getting this DMST algorithm
> to be fully functional, either with Flink 0.8 or with Flink 0.9.
>
> My latest problem(and I have been debugging this for quite some days) was
> that for the test I wrote that extended MultipleProgramsTestBase, the
> Execution mode = CLUSTER test had a very non deterministic behaviour(i.e.
> each time it produced a different number of Actual lines, that were,
> obviously not equal to the number of Expected lines).
>
>
> The function that caused problems has the following header:
>
> public DataSet<Tuple5<Long, Long, Double, Long, Long>>
> updateRootIdsForRealEdges(
>         DataSet<Tuple5<Long, Long, Double, Long, Long>> edges,
>         DataSet<Vertex<Long, Long>> verticesWithRootIDs) {
>
> After some classical "printf debugging",  I saw that the:
>
> DataSet<Tuple5<Long, Long, Double, Long, Long>> edges
>
> when performing a join on it, did not always act as if it had all the
> values you would expect it to have.
>
> The code snippet that solved the problem raises some questions for me:
>
> DataSet<Tuple5<Long, Long, Double, Long, Long>> rebalancedEdges =
> edges.map(new MapFunction<Tuple5<Long, Long, Double, Long, Long>,
> Tuple5<Long, Long, Double, Long, Long>>() {
>     @Override
>     public Tuple5<Long, Long, Double, Long, Long> map(Tuple5<Long,
> Long, Double, Long, Long> longLongDoubleLongLongTuple5) throws
> Exception {
>         return longLongDoubleLongLongTuple5;
>     }
> });
>
>  or
>
> DataSet<Tuple5<Long, Long, Double, Long, Long>> rebalancedEdges =
> edges.rebalance();
>
> and then the join would be performed on those rebalancedEdges.
> And now the test passes(with either of the two solutions).
>
> So the question is:
> *Why  *is this happening? *Is this normal? *
> Maybe it has something to do with the context, then how come a simple map
> fixes everything?
>
> I am sorry if this may seem like one of those "Why is the sky blue?"
> questions, but I am here to learn :D
>
> Thank you!
> Andra
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message