flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From David Dreyfus <dddrey...@gmail.com>
Subject Tasks, slots, and partitioned joins
Date Thu, 26 Oct 2017 02:13:33 GMT
Hello -

I have a large number of pairs of files. For purpose of discussion:
/source1/{1..10000} and /source2/{1..10000}.

I want to join the files pair-wise: /source1/1 joined to /source2/1,
/source1/2 joined to /source2/2, and so on.
I then want to union the results of the pair-wise joins and perform an

I create a simple flink job that has four sources, two joins, and two sinks
to produce intermediate results. This represents two unrelated chains.

I notice that when running this job with parallelism = 1 on a standalone
machine with one task manager and 3 slots, only one slot gets used. 

My concern is that when I scale up to a YARN cluster, flink will continue to
use one slot on one machine instead of using all slots on all machines.

Prior reading suggests all the data source subtasks are added to a default
resource group. Downstream tasks (joins and sinks) want to be colocated with
the data sources. The result is all of my tasks are executed in one slot.

Flink Stream (DataStream) offers the slotSharingGroup() function. This
doesn't seem available to the DataSet user.

*Q1:* How do I force Flink to distribute work evenly across task managers
and the slots allocated to them? If this shouldn't be a concern, please

When I scale up the number of unrelated chains I notice that flink seems to
start all of them at the same time, which results in thrashing and errors -
lots of IO and errors regarding hash buffers.

*Q2:* Is there any method for controlling the scheduling of tasks so that
some finish before others start? My work around is to execute multiple,
sequential batches with results going into an intermediate directory, and
then a final job that aggregates the results. I would certainly prefer one
job that might avoid the intermediate write.

If I treat /source1 as one data source and /source2 as the second, and then
join the two, flink will shuffle and partition the files on the join key.
The /source1 and /source2 files represent this partitioning. They are reused
multiple times; thus, I shuffle and save the results creating /source1 and

*Q3:* Does flink have a method by which I can mark individual files (or
directories) as belonging to a particular partition so that when I try to
join them, the unnecessary shuffle and repartition is avoided?

Thank you,

Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

View raw message