flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: writeAsCSV with partitionBy
Date Tue, 16 Feb 2016 19:54:14 GMT
Yes, you're right. I did not understand your question correctly.

Right now, Flink does not feature an output format that writes records to
output files depending on a key attribute.
You would need to implement such an output format yourself and append it as
follows:

val data = ...
data.partitionByHash(0) // partition to send all records with the same key
to the same machine
  .output(new YourOutputFormat())

In case of many distinct keys, you would need to limit the number of open
file handles. The OF will be easier to implement, if you do a
sortPartition(0, Order.ASCENDING) before the output format to sort the data
by key.

Cheers, Fabian




2016-02-16 19:52 GMT+01:00 Srikanth <srikanth.ht@gmail.com>:

> Fabian,
>
> Not sure if we are on the same page. If I do something like below code, it
> will groupby field 0 and each task will write a separate part file in
> parallel.
>
>     val sink = data1.join(data2)
>     .where(1).equalTo(0) { ((l,r) => ( l._3, r._3) ) }
>     .partitionByHash(0)
>     .writeAsCsv(pathBase + "output/test", rowDelimiter="\n",
> fieldDelimiter="\t" , WriteMode.OVERWRITE)
>
> This will create folder ./output/test/<1,2,3,4...>
>
> But what I was looking for is Hive style partitionBy that will output with
> folder structure
>
>    ./output/field0=1/file
>    ./output/field0=2/file
>    ./output/field0=3/file
>    ./output/field0=4/file
>
> Assuming field0 is Int and has unique values 1,2,3&4.
>
> Srikanth
>
>
> On Mon, Feb 15, 2016 at 6:20 AM, Fabian Hueske <fhueske@gmail.com> wrote:
>
>> Hi Srikanth,
>>
>> DataSet.partitionBy() will partition the data on the declared partition
>> fields.
>> If you append a DataSink with the same parallelism as the partition
>> operator, the data will be written out with the defined partitioning.
>> It should be possible to achieve the behavior you described using
>> DataSet.partitionByHash() or partitionByRange().
>>
>> Best, Fabian
>>
>>
>> 2016-02-12 20:53 GMT+01:00 Srikanth <srikanth.ht@gmail.com>:
>>
>>> Hello,
>>>
>>>
>>>
>>> Is there a Hive(or Spark dataframe) partitionBy equivalent in Flink?
>>>
>>> I'm looking to save output as CSV files partitioned by two columns(date
>>> and hour).
>>>
>>> The partitionBy dataset API is more to partition the data based on a
>>> column for further processing.
>>>
>>>
>>>
>>> I'm thinking there is no direct API to do this. But what will be the
>>> best way of achieving this?
>>>
>>>
>>>
>>> Srikanth
>>>
>>>
>>>
>>
>>
>

Mime
View raw message