flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re: Split csv file in equally sized slices (less than or equal)
Date Mon, 28 Nov 2016 20:08:29 GMT
Great, thanks!

On 28 Nov 2016 8:54 p.m., "Fabian Hueske" <fhueske@gmail.com> wrote:

> Hi Flavio,
>
> sure.
> This code should be close to what you need:
>
> public static class BatchingMapper implements MapPartitionFunction<String, String[]>
{
>
>    int cnt = 0;
>    String[] batch = new String[1000];
>
>    @Override
>    public void mapPartition(Iterable<String> values, Collector<String[]>
out) throws Exception {
>       for(String v : values) {
>          batch[cnt++] = v;
>          if (cnt == 1000) {
>             // emit batch
>             out.collect(batch);
>             Arrays.fill(batch, null);
>             cnt = 0;
>          }
>       }
>       // handle the last batch
>       String[] lastBatch = Arrays.copyOf(batch, cnt);
>       out.collect(lastBatch);
>    }
> }
>
> Cheers, Fabian
>
> 2016-11-28 20:44 GMT+01:00 Flavio Pompermaier <pompermaier@okkam.it>:
>
>> Thanks for the support Fabian!
>> I think I'll try the tumbling window method, it seems cleaner. Btw, just
>> for the sake of completeness, can you show me a brief snippet (also in
>> pseudocode) of a mapPartition that groups together elements into chunks of
>> size n?
>>
>> Best,
>> Flavio
>>
>> On Mon, Nov 28, 2016 at 8:24 PM, Fabian Hueske <fhueske@gmail.com> wrote:
>>
>>> Hi Flavio,
>>>
>>> I think the easiest solution is to read the CSV file with the
>>> CsvInputFormat and use a subsequent MapPartition to batch 1000 rows
>>> together.
>>> In each partition, you might end up with an incomplete batch.
>>> However, I don't see yet how you can feed these batches into the
>>> JdbcInputFormat which does not accept a DataSet as input. You could create
>>> a RichMapFunction that contains the logic of the JdbcInputFormat to
>>> directly query the database with the input of the MapPartitionOperator.
>>>
>>> If you want to use the DataStream API, you can use a tumbling count
>>> window to group IDs together and query the external database in a
>>> subsequent Map operator.
>>>
>>> Hope this helps,
>>> Fabian
>>>
>>>
>>> 2016-11-28 18:32 GMT+01:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>
>>>> Hi to all,
>>>>
>>>> I have a use case where I have to read a huge csv containing ids to
>>>> fetch from a table in a db.
>>>> The jdbc input format can handle parameterized queries so I was
>>>> thinking to fetch data using 1000 id at a time. What is the easiest whay
to
>>>> divide a dataset by slices of 1000 ids each (in order to create parameters
>>>> for my JDBC Input format)? Is that possible?
>>>> Or maybe there's an easiest solutions using streaming APIs?
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>
>

Mime
View raw message