flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Writing Parquet files with Flink
Date Fri, 29 Jan 2016 10:56:08 GMT
The number of input splits does not depend on the number of files but on
the number of HDFS blocks of all files.
Reading a single file with 100 HDFS blocks and reading of 100 files with 1
block each should be divided into 100 input splits which can be read by 100
tasks concurrently (or less tasks with lazy assignment).

If you get less splits than HDFS blocks, you should check the
implementation of the getInputSplits() method in you InputFormat.

Best, Fabian

2016-01-29 11:49 GMT+01:00 Flavio Pompermaier <pompermaier@okkam.it>:

> Hi Fabian,
> thanks for the response!
> From what is my understanding (correct me if I'm wrong) once I produce
> some Parquet dir that I want to read later, the number of files in the dir
> affects the initial parallelism of the next job, i.e.:
>  - If I have less files than available tasks I will not fully exploit
> parallelism
>  - If the number of Parquet files is greater than the number of tasks they
> will read the files as soon as possible (at the maximum parallelism but
> depending on the speed of the pipeline)
> Having a single huge Parquet file could limit the performance of my Flink
> job because the default Hadoop IF can't exploit the parallelism at the
> datasource (because it relies only on the number of files found). To avoid
> that, I should write a custom ParquetInputFormat able to preprocess all
> parquet metadata in those files and extract the HDFS block to read and then
> generate the InputSplits. Am I right? Or am I misunderstanding something?
> Best,
> Flavio
> On Fri, Jan 29, 2016 at 11:14 AM, Fabian Hueske <fhueske@gmail.com> wrote:
>> Hi Flavio,
>> using a default FileOutputFormat, Flink writes one output file for each
>> data sink task, i.e., as many files as the defined parallelism.
>> The size of these files depends on the total output size and the
>> distribution. If you write to HDFS, a file consists of one or more HDFS
>> blocks.
>> Parquet files are internally also organized in blocks. Each Parquet block
>> has a header with some meta information and data is organized and
>> compressed in a columnar fashion with a block. Due to this, the
>> ParquetInputFormat must always read a complete Parquet block.
>> Flink's FileInputFormats split the input data along the HDFS blocks and
>> try to assign input splits such that blocks can be locally read. For best
>> performance, Parquet blocks should be aligned with HDFS blocks. It is not a
>> problem, if a Parquet block is not completely filled.
>> If you want to control the size of the parallel output files, you would
>> need to know the total output size and choose the parallelism accordingly.
>> Flink is not able to infer the output size (depends on input size, task
>> semantics, data distribution, etc.), so it is up to you to choose the right
>> parallelism.
>> Best, Fabian
>> 2016-01-28 16:12 GMT+01:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>> Hi to all,
>>> I was reading about optimal Parquet file size and HDFS block size.
>>> The ideal situation for Parquet is when its block size (and thus the
>>> maximum size of each row group) is equal to the HDFS block size. The
>>> default behaviour of Flink is that the output file's size depends on the
>>> output parallelism and thus I don't know how to achieve that.
>>> Is that feasible?
>>> Best,
>>> Flavio

View raw message