flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Need to understand the execution model of the Flink
Date Mon, 19 Feb 2018 10:54:36 GMT
Hi,

that's a difficult question without knowing the details of your job.
A NoSpaceLeftOnDevice error occurs when a file system is full.

This can happen if:
- A Flink algorithm writes to disk, e.g., an external sort or the hash
table of a hybrid hash join. This can happen for GroupBy, Join, Distinct,
or any other operation that requires to group or join data. Filters will
never spill to disk.
- An OutputFormat writes to disk.

The data is written to a temp directory, that can be configured in the
./conf/flink-conf.yaml file.

Did you check how the tasks are distributed across the task managers?
The web UI can help to diagnose such problems.

Best, Fabian

2018-02-19 11:22 GMT+01:00 Darshan Singh <darshan.meel@gmail.com>:

> Thanks Fabian for such detailed explanation.
>
> I am using a datset in between so i guess csv is read once. Now to my real
> issue i have 6 task managers each having 4 cores and i have 2 slots per
> task manager.
>
> Now my csv file is jus 1 gb and i create table and transform to dataset
> and then run 15 different filters and extra processing which all run in
> almost parallel.
>
> However it fails with error no space left on device on one of the task
> manager. Space on each task manager is 32 gb in /tmp. So i am not sure why
> it is running out of space. I do use some joins with othrr tables but those
> are few megabytes.
>
> So i was assuming that somehow all parallel executions were storing data
> in /tmp and were filling it.
>
> So i would like to know wht could be filling space.
>
> Thanks
>
> On 19 Feb 2018 10:10 am, "Fabian Hueske" <fhueske@gmail.com> wrote:
>
> Hi,
>
> this works as follows.
>
> - Table API and SQL queries are translated into regular DataSet jobs
> (assuming you are running in a batch ExecutionEnvironment).
> - A query is translated into a sequence of DataSet operators when you 1)
> transform the Table into a DataSet or 2) write it to a TableSink. In both
> cases, the optimizer is invoked and recursively goes back from the
> converted/emitted Table back to its roots, i.e., a TableSource or a
> DataSet.
>
> This means, that if you create a Table from a TableSource and apply
> multiple filters on it and write each filter to a TableSink, the CSV file
> will be read 10 times, filtered 10 times and written 10 times. This is not
> efficient, because, you could also just read the file once and apply all
> filters in parallel.
> You can do this by converting the Table that you read with a TableSource
> into a DataSet and register the DataSet again as a Table. In that case, the
> translations of all TableSinks will stop at the DataSet and not include the
> TableSource which reads the file.
>
> The following figures illustrate the difference:
>
> 1) Without DataSet in the middle:
>
> TableSource -> Filter1 -> TableSink1
> TableSource -> Filter2 -> TableSink2
> TableSource -> Filter3 -> TableSink3
>
> 2) With DataSet in the middle:
>
>                         /-> Filter1 -> TableSink1
> TableSource -<-> Filter2 -> TableSink2
>                         \-> Filter3 -> TableSink3
>
> I'll likely add a feature to internally translate an intermediate Table to
> make this a bit easier.
> The underlying problem is that the SQL optimizer cannot translate queries
> with multiple sinks.
> Instead, each sink is individually translated and the optimizer does not
> know that common execution paths could be shared.
>
> Best,
> Fabian
>
>
> 2018-02-19 2:19 GMT+01:00 Darshan Singh <darshan.meel@gmail.com>:
>
>> Thanks for reply.
>>
>> I guess I am not looking for alternate. I am trying to understand what
>> flink does in this scenario and if 10 tasks ar egoing in parallel I am sure
>> they will be reading csv as there is no other way.
>>
>> Thanks
>>
>> On Mon, Feb 19, 2018 at 12:48 AM, Niclas Hedhman <niclas@hedhman.org>
>> wrote:
>>
>>>
>>> Do you really need the large single table created in step 2?
>>>
>>> If not, what you typically do is that the Csv source first do the common
>>> transformations. Then depending on whether the 10 outputs have different
>>> processing paths or not, you either do a split() to do individual
>>> processing depending on some criteria, or you just have the sink put each
>>> record in separate tables.
>>> You have full control, at each step along the transformation path
>>> whether it can be parallelized or not, and if there are no sequential
>>> constraints on your model, then you can easily fill all cores on all hosts
>>> quite easily.
>>>
>>> Even if you need the step 2 table, I would still just treat that as a
>>> split(), a branch ending in a Sink that does the storage there. No need to
>>> read records from file over and over again, nor to store them first in step
>>> 2 table and read them out again.
>>>
>>> Don't ask *me* about what happens in failure scenarios... I have myself
>>> not figured that out yet.
>>>
>>> HTH
>>> Niclas
>>>
>>> On Mon, Feb 19, 2018 at 3:11 AM, Darshan Singh <darshan.meel@gmail.com>
>>> wrote:
>>>
>>>> Hi I would like to understand the execution model.
>>>>
>>>> 1. I have a csv files which is say 10 GB.
>>>> 2. I created a table from this file.
>>>>
>>>> 3. Now I have created filtered tables on this say 10 of these.
>>>> 4. Now I created a writetosink for all these 10 filtered tables.
>>>>
>>>> Now my question is that are these 10 filetered tables be written in
>>>> parallel (suppose i have 40 cores and set up parallelism to say 40 as well.
>>>>
>>>> Next question I have is that the table which I created form the csv
>>>> file which is common wont be persisted by flink internally rather for all
>>>> 10 filtered tables it will read csv files and then apply the filter and
>>>> write to sink.
>>>>
>>>> I think that for all 10 filtered tables it will read csv again and
>>>> again in this case it will be read 10 times.  Is my understanding correct
>>>> or I am missing something.
>>>>
>>>> What if I step 2 I change table to dataset and back?
>>>>
>>>> Thanks
>>>>
>>>
>>>
>>>
>>> --
>>> Niclas Hedhman, Software Developer
>>> http://polygene.apache.org - New Energy for Java
>>>
>>
>>
>
>

Mime
View raw message