flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Miguel Coimbra <miguel.e.coim...@gmail.com>
Subject Re: How to perform efficient DataSet reuse between iterations
Date Tue, 28 Nov 2017 23:44:21 GMT

You're right, I was overlooking that.
With your suggestion, I now just define a different sink in each iteration
of the loop.
Then they all output to disk when executing a single bigger plan.

I have one more question: I know I can retrieve the total time this single
job takes to execute, but what if I want to know the time taken for
specific operators in the dag?
Is there some functionality in the Flink Batch API akin to counting
elements but for measuring time instead?
For example, if I am not mistaken, an operator can be executed in parallel
or serially (with a parallelism of one).

Is there a straightforward way to get the time taken by the operator's
In a way that I could:

a) just get the time of a single task (if running serially) to get the
total operator execution time;
b) know the time taken by each parallel component of the operator's
execution so I could know where and what was the "lagging element" in the
operator's execution.

Is this possible? I was hoping I could retrieve this information in the
Java program itself and avoid processing logs.

Thanks again.

Best regards,

Miguel E. Coimbra
Email: miguel.e.coimbra@gmail.com <miguel.e.coimbra@ist.utl.pt>
Skype: miguel.e.coimbra

On 28 November 2017 at 08:56, Fabian Hueske <fhueske@gmail.com> wrote:

> Hi,
> by calling result.count(), you compute the complete plan from the
> beginning and not just the operations you added since the last execution.
> Looking at the output you posted, each step takes about 15 seconds (with
> about 5 secs of initialization).
> So the 20 seconds of the first step include initialization + 1st step.
> The 35 seconds on the second step include initialization, 1st step + 2nd
> step.
> If you don't call count on the intermediate steps, you can compute the 4th
> step in 65 seconds.
> Implementing a caching operator would be a pretty huge effort because you
> need to touch code at many places such as the API, optimizer, runtime,
> scheduling, etc.
> The documentation you found should still be applicable. There hasn't been
> major additions to the DataSet API and runtime in the last releases.
> Best, Fabian
> 2017-11-28 9:14 GMT+01:00 Miguel Coimbra <miguel.e.coimbra@gmail.com>:
>> Hello Fabian,
>> Thank you for the reply.
>> I was hoping the situation had in fact changed.
>> As far as I know, I am not calling execute() directly even once - it is
>> being called implicitly by simple DataSink elements added to the plan
>> through count():
>> System.out.println(String.format("%d-th graph algorithm produced %d
>> elements. (%d.%d s).",
>>                             executionCounter,
>>                             *result.count()*, // this would trigger
>> execution...
>>                             env.getLastJobExecutionResult(
>> ).getNetRuntime(TimeUnit.SECONDS),
>>                             env.getLastJobExecutionResult(
>> ).getNetRuntime(TimeUnit.MILLISECONDS) % 1000));
>> I have taken a look at Flink's code base (e.g. how the dataflow dag is
>> processed with classes such as  org.apache.flink.optimizer.traversals.GraphCreatingVisitor,
>> org.apache.flink.api.java.operators.OperatorTranslation) but I'm not
>> sure on the most direct way to achieve this.
>> Perhaps I missed some online documentation that would help to get a grip
>> on how to contribute to the different parts of Flink?
>> I did find some information which hints at implementing this sort of
>> thing (such as adding custom operators) but it was associated to an old
>> version of Flink:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>> internals/add_operator.html
>> However, as far as I know there is no equivalent page in the current
>> online stable or snapshot documentation.
>> What would be the best way to go about this?
>> It really seems that the DataSet stored in the result variable is always
>> representing an increasing sequence of executions and not just the results
>> of the last execution.
>> Best regards,
>> Miguel E. Coimbra
>> Email: miguel.e.coimbra@gmail.com <miguel.e.coimbra@ist.utl.pt>
>> Skype: miguel.e.coimbra
>> On 27 November 2017 at 22:56, Fabian Hueske <fhueske@gmail.com> wrote:
>>> Hi Miguel,
>>> I'm sorry but AFAIK, the situation has not changed.
>>> Is it possible that you are calling execute() multiple times?
>>> In that case, the 1-st and 2-nd graph would be recomputed before the
>>> 3-rd graph is computed.
>>> That would explain the increasing execution time of 15 seconds.
>>> Best, Fabian
>>> 2017-11-26 17:45 GMT+01:00 Miguel Coimbra <miguel.e.coimbra@gmail.com>:
>>>> Hello,
>>>> I'm facing a problem in an algorithm where I would like to constantly
>>>> update a DataSet representing a graph, perform some computation,
>>>> output one or more DataSink (such as a file on the local system) and
>>>> then reuse the DataSet for a next iteration.
>>>> ​I want to avoid spilling the results to disk at the end of an
>>>> iteration and to read it back in the next iterations - the graph is very
>>>> big and I do not wish to incur that time overhead.
>>>> I want to reuse the full result DataSet of each iteration in the next
>>>> one and I want to save to disk a small percentage of the produced
>>>> DataSet for each iteration.
>>>> The space complexity is rather constant - the number of edges in the
>>>> graph increases by only 100 between iterations (which is an extremely low
>>>> percentage of the original graph's edges) and is obtained using
>>>> env.fromCollection(edgesToAdd).
>>>> Although I am using Flink's Gelly API for graphs, I have no problem
>>>> working directly with the underlying vertex and edge DataSet elements.​
>>>> Two ways to do this occur to me, but it seems both are currently not
>>>> supported in Flink, as per Vasia's answer to this Stack Overflow quest​ion
>>>> [1]:
>>>> «​*​*
>>>> *Unfortunately, it is not currently possible to output intermediate
>>>> results from a bulk iteration.You can only output the final result at the
>>>> end of the iteration.Also, as you correctly noticed, Flink cannot
>>>> efficiently unroll a while-loop or for-loop, so that won't work either.»*
>>>> *1.* I thought I could create a bulk iteration, perform the
>>>> computation and between iterations, output the result to the file system.
>>>> However, this is not possible, as per Vasia's answer, and produces the
>>>> following exception on execution when I try (for example, to calculate a
>>>> centrality metric for every vertex and dump the results to disk), as
>>>> expected based on that information:
>>>> org.apache.flink.api.common.InvalidProgramException: A data set that
>>>> is part of an iteration was used as a sink or action. Did you forget to
>>>> close the iteration?
>>>> *2.* Using a for loop in my own program and triggering sequential
>>>> Flink job executions.
>>>> Problem: in this scenario, while I am able to use a DataSet produced
>>>> in an iteration's Flink job (and dump the desired output information to
>>>> disk) and pass it to the next Flink job, the computation time increases
>>>> constantly:
>>>> (I also tried manually starting a session which is kept open with
>>>> env.startNewSession() before the loop - no impact)
>>>> ​​
>>>> Initial graph has 33511 vertices and 411578 edges.
>>>> Added 113 vertices and 100 edges.
>>>> 1-th graph now has 33524 vertices and 411678 edges (2.543 s).
>>>> 1-th graph algorithm produced 33524 elements. *(20.96 s)*.
>>>> Added 222 vertices and 200 edges.
>>>> 2-th graph now has 33536 vertices and 411778 edges (1.919 s).
>>>> 2-th graph algorithm produced 33536 elements. *(35.913 s)*.
>>>> Added 326 vertices and 300 edges.
>>>> 3-th graph now has 33543 vertices and 411878 edges (1.825 s).
>>>> 3-th graph algorithm produced 33543 elements. *(49.624 s)*.
>>>> Added 436 vertices and 400 edges.
>>>> 4-th graph now has 33557 vertices and 411978 edges (1.482 s).
>>>> 4-th graph algorithm produced 33557 elements. *(66.209 s)*.
>>>> Note that the number of elements in the output DataSet is equal to the
>>>> number of vertices in the graph.
>>>> On iteration i in my program, the executed graph algorithm
>>>> incorporates the result DataSet of iteration i - 1 by means of the
>>>> g.joinWithVertices(previousResultDataSet, new RanksJoinFunction())
>>>> function.
>>>> The VertexJoinFunction is a simple forwarding mechanism to set the
>>>> previous values:
>>>> @FunctionAnnotation.ForwardedFieldsFirst("*->*")
>>>> private static class RanksJoinFunction implements
>>>> VertexJoinFunction<Double, Double> {
>>>>     @Override
>>>>     public Double vertexJoin(final Double vertexValue, final Double
>>>> inputValue) throws Exception {
>>>>         return inputValue;
>>>>     }
>>>> }
>>>> ​I have also used Flink's plan visualizer to check for discrepancies
>>>> between the first iteration and the tenth (for example), but the layout of
>>>> the plan remains exactly the same while the execution time continually
>>>> increases for what should be the same amount of computations.
>>>> *Bottom-line:* ​I was hoping someone could tell me how to overcome the
>>>> performance bottleneck using the sequential job approach or enabling the
>>>> output of intermediate results using Flink's Bulk Iterations.
>>>> ​​​I believe others have stumbled upon this limitation before [2, 3].​​
>>>> I have tested this on a dual-core i7 with 8 GB RAM on Java 8 64-bit
>>>> using a local environment:
>>>> final Configuration conf = new Configuration();
>>>> final LocalEnvironment lenv = (LocalEnvironment)
>>>> ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
>>>> final ExecutionEnvironment env = lenv;
>>>> env.getConfig().disableSysoutLogging().setParallelism(1);
>>>> ​I wish to ​execute in a cluster later on with a bigger dataset, so it
>>>> would be essential that to maximize the ability to reuse the DataSets that
>>>> are distributed by the Flink runtime.
>>>> This would allow me to avoid the performance bottleneck that I
>>>> described.
>>>> ​Hopefully someone may shed light on this.​
>>>> ​Thanks for your attention.​
>>>> ​References:​
>>>> ​[1] https://stackoverflow.com/questions/37224140/possibility-of-
>>>> saving-partial-outputs-from-bulk-iteration-in-flink-dataset/
>>>> 37352770#37352770​
>>>> ​[2] http://mail-archives.apache.org/mod_mbox/flink-dev/201605.mb
>>>> ox/%3CCY1PR0601MB1228D348B41B274B52BCD3AB94450@CY1PR0601MB12
>>>> 28.namprd06.prod.outlook.com%3E​
>>>> ​[3] http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>> ble.com/Intermediate-output-during-delta-iterations-td436.html​
>>>> Miguel E. Coimbra
>>>> Email: miguel.e.coimbra@gmail.com <miguel.e.coimbra@ist.utl.pt>
>>>> Skype: miguel.e.coimbra

View raw message