flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Flink loading an S3 File out of order
Date Thu, 10 Mar 2016 17:09:40 GMT
Hi Benjamin,

Flink reads data usually in parallel. This is done by splitting the input
(e.g., a file) into several input splits. Each input split is independently
processed. Since splits are usually concurrently processed by more than one
task, Flink does not care about the order by default.

You can implement a special InputFormat that uses a custom
InputSplitAssigner to ensure that splits are handed out in order.
This would requires a bit of coding though.

A DataSet is usually distributed among multiple partitions/tasks and does
also not have the concept (complete) order. It is possible to sort the data
of a data set in each individual partition by calling
DataSet.sortPartition(key, order). If you do that with a parallelism of one
(DataSet.sortPartition().setParallelism(1)), you'll have a fully ordered
data set, however only on one machine.
Flink does also support range partitioning (DataSet.partitionByRange()) in
case you want to sort the data in parallel.

Best, Fabian

2016-03-10 16:52 GMT+01:00 Benjamin Kadish <benjamin.kadish@adfin.com>:

> I am trying to read a file from S3 in the correct order. It seems to be
> that Flink is downloading the file out of order, or at least its
> constructing the DataSet out of order. I
> tried using hadoop to download the file and it seemed to download it in
> order.
> I am able to reproduce the problem with the following line:
> env.readTextFileWithValue(conf.options.get(S3FileName).get)
>    .writeAsText(s"${conf.output}/output",writeMode = FileSystem.WriteMode.OVERWRITE)
> The output looks something like
> line 1001
> line 1002
> ...
> line 1304
> line 1
> Is there a way to guarantee order?
> --
> Benjamin Kadish
> (260) 441-6159

View raw message