flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From qi luo <luoqi...@gmail.com>
Subject Re: Set partition number of Flink DataSet
Date Thu, 21 Mar 2019 12:48:40 GMT
Thank you Fabian! I will check these issues.

> On Mar 20, 2019, at 4:25 PM, Fabian Hueske <fhueske@gmail.com> wrote:
> 
> Hi,
> 
> I'm sorry but I'm only familiar with the high-level design but not with the implementation
details and concrete roadmap for the involved components.
> I think that FLINK-10288 [1] and FLINK-10429 [2] are related to partition handling.
> 
> Best,
> Fabian
> 
> [1] https://issues.apache.org/jira/browse/FLINK-10288 <https://issues.apache.org/jira/browse/FLINK-10288>
> [2] https://issues.apache.org/jira/browse/FLINK-10429 <https://issues.apache.org/jira/browse/FLINK-10429>
> 
> 
> Am Fr., 15. März 2019 um 12:13 Uhr schrieb qi luo <luoqi.bd@gmail.com <mailto:luoqi.bd@gmail.com>>:
> Hi Fabian,
> 
> I understand this is a by-design behavior, since Flink is firstly built for streaming.
Supporting batch shuffle and custom partition number in Flink may be compelling in batch processing.

> 
> Could you help explain a bit more on which works are needed to be done, so Flink can
support custom partition numbers numbers? We would be willing to help improve this area.
> 
> Thanks,
> Qi
> 
>> On Mar 15, 2019, at 4:25 PM, Fabian Hueske <fhueske@gmail.com <mailto:fhueske@gmail.com>>
wrote:
>> 
>> Hi,
>> 
>> Flink works a bit differently than Spark.
>> By default, Flink uses pipelined shuffles which push results of the sender immediately
to the receivers (btw. this is one of the building blocks for stream processing).
>> However, pipelined shuffles require that all receivers are online. Hence, there number
of partitions determines the number of running tasks.
>> There is also a batch shuffle mode, but it needs to be explicitly enabled and AFAIK
does not resolve the dependency of number of partitions and task parallelism.
>> 
>> However, the community is currently working on many improvements for batch processing,
including scheduling and fault-tolerance. 
>> Batched shuffles are an important building block for this and there might be better
support for your use case in the future.
>> 
>> Best, Fabian
>> 
>> Am Fr., 15. März 2019 um 03:56 Uhr schrieb qi luo <luoqi.bd@gmail.com <mailto:luoqi.bd@gmail.com>>:
>> Hi Ken,
>> 
>> That looks awesome! I’ve implemented something similar to your bucketing sink,
but using multiple internal writers rather than multiple internal output.
>> 
>> Besides this, I’m also curious whether Flink can achieve this like Spark: allow
user to specify partition number in partitionBy() method (so no multiple output formats are
needed). But this seems to need non-trivial changes in Flink core.
>> 
>> Thanks,
>> Qi
>> 
>>> On Mar 15, 2019, at 2:36 AM, Ken Krugler <kkrugler_lists@transpac.com <mailto:kkrugler_lists@transpac.com>>
wrote:
>>> 
>>> Hi Qi,
>>> 
>>> See https://github.com/ScaleUnlimited/flink-utils/ <https://github.com/ScaleUnlimited/flink-utils/>,
for a rough but working version of a bucketing sink.
>>> 
>>> — Ken
>>> 
>>> 
>>>> On Mar 13, 2019, at 7:46 PM, qi luo <luoqi.bd@gmail.com <mailto:luoqi.bd@gmail.com>>
wrote:
>>>> 
>>>> Hi Ken,
>>>> 
>>>> Agree. I will try partitonBy() to reducer the number of parallel sinks, and
may also try sortPartition() so each sink could write files one by one. Looking forward to
your solution. :)
>>>> 
>>>> Thanks,
>>>> Qi
>>>> 
>>>>> On Mar 14, 2019, at 2:54 AM, Ken Krugler <kkrugler_lists@transpac.com
<mailto:kkrugler_lists@transpac.com>> wrote:
>>>>> 
>>>>> Hi Qi,
>>>>> 
>>>>>> On Mar 13, 2019, at 1:26 AM, qi luo <luoqi.bd@gmail.com <mailto:luoqi.bd@gmail.com>>
wrote:
>>>>>> 
>>>>>> Hi Ken,
>>>>>> 
>>>>>> Do you mean that I can create a batch sink which writes to N files?

>>>>> 
>>>>> Correct.
>>>>> 
>>>>>> That sounds viable, but since our data size is huge (billions of
records & thousands of files), the performance may be unacceptable. 
>>>>> 
>>>>> The main issue with performance (actually memory usage) is how many OutputFormats
do you need to have open at the same time.
>>>>> 
>>>>> If you partition by the same key that’s used to define buckets, then
the max number is less, as each parallel instance of the sink only gets a unique subset of
all possible bucket values.
>>>>> 
>>>>> I’m actually dealing with something similar now, so I might have a
solution to share soon.
>>>>> 
>>>>> — Ken
>>>>> 
>>>>> 
>>>>>> I will check Blink and give it a try anyway.
>>>>>> 
>>>>>> Thank you,
>>>>>> Qi
>>>>>> 
>>>>>>> On Mar 12, 2019, at 11:58 PM, Ken Krugler <kkrugler_lists@transpac.com
<mailto:kkrugler_lists@transpac.com>> wrote:
>>>>>>> 
>>>>>>> Hi Qi,
>>>>>>> 
>>>>>>> If I understand what you’re trying to do, then this sounds
like a variation of a bucketing sink.
>>>>>>> 
>>>>>>> That typically uses a field value to create a directory path
or a file name (though the filename case is only viable when the field is also what’s used
to partition the data)
>>>>>>> 
>>>>>>> But I don’t believe Flink has built-in support for that, in
batch mode (see BucketingSink <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html>
for streaming).
>>>>>>> 
>>>>>>> Maybe Blink has added that? Hoping someone who knows that codebase
can chime in here.
>>>>>>> 
>>>>>>> Otherwise you’ll need to create a custom sink to implement
the desired behavior - though abusing a MapPartitionFunction <https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/api/common/functions/MapPartitionFunction.html>
would be easiest, I think.
>>>>>>> 
>>>>>>> — Ken
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>>> On Mar 12, 2019, at 2:28 AM, qi luo <luoqi.bd@gmail.com
<mailto:luoqi.bd@gmail.com>> wrote:
>>>>>>>> 
>>>>>>>> Hi Ken,
>>>>>>>> 
>>>>>>>> Thanks for your reply. I may not make myself clear: our problem
is not about reading but rather writing. 
>>>>>>>> 
>>>>>>>> We need to write to N files based on key partitioning. We
have to use setParallelism() to set the output partition/file number, but when the partition
number is too large (~100K), the parallelism would be too high. Is there any other way to
achieve this?
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> Qi
>>>>>>>> 
>>>>>>>>> On Mar 11, 2019, at 11:22 PM, Ken Krugler <kkrugler_lists@transpac.com
<mailto:kkrugler_lists@transpac.com>> wrote:
>>>>>>>>> 
>>>>>>>>> Hi Qi,
>>>>>>>>> 
>>>>>>>>> I’m guessing you’re calling createInput() for each
input file.
>>>>>>>>> 
>>>>>>>>> If so, then instead you want to do something like:
>>>>>>>>> 
>>>>>>>>>     	Job job = Job.getInstance();
>>>>>>>>> 
>>>>>>>>> 	for each file…
>>>>>>>>> 		FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(file
path));
>>>>>>>>> 
>>>>>>>>> 	env.createInput(HadoopInputs.createHadoopInput(…,
job)
>>>>>>>>> 
>>>>>>>>> Flink/Hadoop will take care of parallelizing the reads
from the files, given the parallelism that you’re specifying.
>>>>>>>>> 
>>>>>>>>> — Ken
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>>> On Mar 11, 2019, at 5:42 AM, qi luo <luoqi.bd@gmail.com
<mailto:luoqi.bd@gmail.com>> wrote:
>>>>>>>>>> 
>>>>>>>>>> Hi,
>>>>>>>>>> 
>>>>>>>>>> We’re trying to distribute batch input data to
(N) HDFS files partitioning by hash using DataSet API. What I’m doing is like:
>>>>>>>>>> 
>>>>>>>>>> env.createInput(…)
>>>>>>>>>>       .partitionByHash(0)
>>>>>>>>>>       .setParallelism(N)
>>>>>>>>>>       .output(…)
>>>>>>>>>> 
>>>>>>>>>> This works well for small number of files. But when
we need to distribute to large number of files (say 100K), the parallelism becomes too large
and we could not afford that many TMs.
>>>>>>>>>> 
>>>>>>>>>> In spark we can write something like ‘rdd.partitionBy(N)’
and control the parallelism separately (using dynamic allocation). Is there anything similar
in Flink or other way we can achieve similar result? Thank you!
>>>>>>>>>> 
>>>>>>>>>> Qi
>>>>>>>>> 
>>>>>>>>> --------------------------
>>>>>>>>> Ken Krugler
>>>>>>>>> +1 530-210-6378
>>>>>>>>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>>>>>>>>> Custom big data solutions & training
>>>>>>>>> Flink, Solr, Hadoop, Cascading & Cassandra
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>>> --------------------------
>>>>>>> Ken Krugler
>>>>>>> +1 530-210-6378
>>>>>>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>>>>>>> Custom big data solutions & training
>>>>>>> Flink, Solr, Hadoop, Cascading & Cassandra
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> --------------------------
>>>>> Ken Krugler
>>>>> +1 530-210-6378
>>>>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>>>>> Custom big data solutions & training
>>>>> Flink, Solr, Hadoop, Cascading & Cassandra
>>>> 
>>> 
>>> --------------------------
>>> Ken Krugler
>>> +1 530-210-6378
>>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>>> Custom big data solutions & training
>>> Flink, Solr, Hadoop, Cascading & Cassandra
>>> 
>> 
> 


Mime
View raw message