flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ken Krugler <kkrugler_li...@transpac.com>
Subject Re: Set partition number of Flink DataSet
Date Tue, 12 Mar 2019 15:58:36 GMT
Hi Qi,

If I understand what you’re trying to do, then this sounds like a variation of a bucketing
sink.

That typically uses a field value to create a directory path or a file name (though the filename
case is only viable when the field is also what’s used to partition the data)

But I don’t believe Flink has built-in support for that, in batch mode (see BucketingSink
<https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html>
for streaming).

Maybe Blink has added that? Hoping someone who knows that codebase can chime in here.

Otherwise you’ll need to create a custom sink to implement the desired behavior - though
abusing a MapPartitionFunction <https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/api/common/functions/MapPartitionFunction.html>
would be easiest, I think.

— Ken



> On Mar 12, 2019, at 2:28 AM, qi luo <luoqi.bd@gmail.com> wrote:
> 
> Hi Ken,
> 
> Thanks for your reply. I may not make myself clear: our problem is not about reading
but rather writing. 
> 
> We need to write to N files based on key partitioning. We have to use setParallelism()
to set the output partition/file number, but when the partition number is too large (~100K),
the parallelism would be too high. Is there any other way to achieve this?
> 
> Thanks,
> Qi
> 
>> On Mar 11, 2019, at 11:22 PM, Ken Krugler <kkrugler_lists@transpac.com <mailto:kkrugler_lists@transpac.com>>
wrote:
>> 
>> Hi Qi,
>> 
>> I’m guessing you’re calling createInput() for each input file.
>> 
>> If so, then instead you want to do something like:
>> 
>>     	Job job = Job.getInstance();
>> 
>> 	for each file…
>> 		FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(file path));
>> 
>> 	env.createInput(HadoopInputs.createHadoopInput(…, job)
>> 
>> Flink/Hadoop will take care of parallelizing the reads from the files, given the
parallelism that you’re specifying.
>> 
>> — Ken
>> 
>> 
>>> On Mar 11, 2019, at 5:42 AM, qi luo <luoqi.bd@gmail.com <mailto:luoqi.bd@gmail.com>>
wrote:
>>> 
>>> Hi,
>>> 
>>> We’re trying to distribute batch input data to (N) HDFS files partitioning
by hash using DataSet API. What I’m doing is like:
>>> 
>>> env.createInput(…)
>>>       .partitionByHash(0)
>>>       .setParallelism(N)
>>>       .output(…)
>>> 
>>> This works well for small number of files. But when we need to distribute to
large number of files (say 100K), the parallelism becomes too large and we could not afford
that many TMs.
>>> 
>>> In spark we can write something like ‘rdd.partitionBy(N)’ and control the
parallelism separately (using dynamic allocation). Is there anything similar in Flink or other
way we can achieve similar result? Thank you!
>>> 
>>> Qi
>> 
>> --------------------------
>> Ken Krugler
>> +1 530-210-6378
>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>> Custom big data solutions & training
>> Flink, Solr, Hadoop, Cascading & Cassandra
>> 
> 

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra


Mime
View raw message