flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Writing Parquet files with Flink
Date Fri, 29 Jan 2016 11:11:43 GMT
Yes, make both block sizes the same and you're good.
I think you can neglect the overhead, unless we are not talking about
1000's of small files (smaller than block size).

2016-01-29 12:06 GMT+01:00 Flavio Pompermaier <pompermaier@okkam.it>:

> So there's no need to worry about the number of parquet files size from
> the Flink point of view if I set correctly the parquet block size (equal to
> the HDFS block size)...
> It only affects the Parquet file overhead (header and footer present in
> each file) and the HDFS resources required to handle them (one object for
> each HDFS file), right?
> On Fri, Jan 29, 2016 at 11:56 AM, Fabian Hueske <fhueske@gmail.com> wrote:
>> The number of input splits does not depend on the number of files but on
>> the number of HDFS blocks of all files.
>> Reading a single file with 100 HDFS blocks and reading of 100 files with
>> 1 block each should be divided into 100 input splits which can be read by
>> 100 tasks concurrently (or less tasks with lazy assignment).
>> If you get less splits than HDFS blocks, you should check the
>> implementation of the getInputSplits() method in you InputFormat.
>> Best, Fabian
>> 2016-01-29 11:49 GMT+01:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>> Hi Fabian,
>>> thanks for the response!
>>> From what is my understanding (correct me if I'm wrong) once I produce
>>> some Parquet dir that I want to read later, the number of files in the dir
>>> affects the initial parallelism of the next job, i.e.:
>>>  - If I have less files than available tasks I will not fully exploit
>>> parallelism
>>>  - If the number of Parquet files is greater than the number of tasks
>>> they will read the files as soon as possible (at the maximum parallelism
>>> but depending on the speed of the pipeline)
>>> Having a single huge Parquet file could limit the performance of my
>>> Flink job because the default Hadoop IF can't exploit the parallelism at
>>> the datasource (because it relies only on the number of files found). To
>>> avoid that, I should write a custom ParquetInputFormat able to preprocess
>>> all parquet metadata in those files and extract the HDFS block to read and
>>> then generate the InputSplits. Am I right? Or am I misunderstanding
>>> something?
>>> Best,
>>> Flavio
>>> On Fri, Jan 29, 2016 at 11:14 AM, Fabian Hueske <fhueske@gmail.com>
>>> wrote:
>>>> Hi Flavio,
>>>> using a default FileOutputFormat, Flink writes one output file for each
>>>> data sink task, i.e., as many files as the defined parallelism.
>>>> The size of these files depends on the total output size and the
>>>> distribution. If you write to HDFS, a file consists of one or more HDFS
>>>> blocks.
>>>> Parquet files are internally also organized in blocks. Each Parquet
>>>> block has a header with some meta information and data is organized and
>>>> compressed in a columnar fashion with a block. Due to this, the
>>>> ParquetInputFormat must always read a complete Parquet block.
>>>> Flink's FileInputFormats split the input data along the HDFS blocks and
>>>> try to assign input splits such that blocks can be locally read. For best
>>>> performance, Parquet blocks should be aligned with HDFS blocks. It is not
>>>> problem, if a Parquet block is not completely filled.
>>>> If you want to control the size of the parallel output files, you would
>>>> need to know the total output size and choose the parallelism accordingly.
>>>> Flink is not able to infer the output size (depends on input size, task
>>>> semantics, data distribution, etc.), so it is up to you to choose the right
>>>> parallelism.
>>>> Best, Fabian
>>>> 2016-01-28 16:12 GMT+01:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>>> Hi to all,
>>>>> I was reading about optimal Parquet file size and HDFS block size.
>>>>> The ideal situation for Parquet is when its block size (and thus the
>>>>> maximum size of each row group) is equal to the HDFS block size. The
>>>>> default behaviour of Flink is that the output file's size depends on
>>>>> output parallelism and thus I don't know how to achieve that.
>>>>> Is that feasible?
>>>>> Best,
>>>>> Flavio

View raw message