flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chesnay Schepler <ches...@apache.org>
Subject Re: Collector.collect
Date Mon, 01 May 2017 16:55:31 GMT
Oh you have multiple different output formats, missed that.

For the Batch API you are i believe correct, using a custom 
output-format is the best solution.

In the Streaming API the code below should be equally fast, if the 
filtered sets don't overlap.

input = ...
input.filter(conditionA).output(formatA)
input.filter(conditonB).output(formatB)

That is because all filters would be chained; hell all sources might be 
as well (not to sure on this one).

On 01.05.2017 17:05, Newport, Billy wrote:
>
> There is likely a bug then, the ENUM,Record stream to a filter to a 
> set of outputformats per filter was slower than the BITMASK,Record to 
> single OutputFormat which demux’s the data to each file internally
>
> Are you saying do a custom writer inside a map rather than either of 
> the 2 above approaches?
>
> *From:*Chesnay Schepler [mailto:chesnay@apache.org]
> *Sent:* Monday, May 01, 2017 10:41 AM
> *To:* user@flink.apache.org
> *Subject:* Re: Collector.collect
>
> Hello,
>
> @Billy, what prevented you from duplicating/splitting the record, 
> based on the bitmask, in a map function before the sink?
> This shouldn't incur any serialization overhead if the sink is chained 
> to the map. The emitted Tuple could also share the
> GenericRecord; meaning you don't even have to copy it.
>
> On 01.05.2017 14:52, Newport, Billy wrote:
>
>     We’ve done that but it’s very expensive from a serialization point
>     of view when writing the same record multiple times, each in a
>     different tuple.
>
>     For example, we started with this:
>
>     .collect(new Tuple<Short, GenericRecord)).
>
>     The record would be written with short = 0 and again with short =
>     1. This results in the GenericRecord being serialized twice. You
>     also prolly need filters on the output dataset which is expensive
>     also.
>
>     We switched instead to a bitmask. Now, we write the record once
>     and set bits in the short for each file the record needs to be
>     written to. Our next step is to write records to a file based on
>     the short. We wrote a new outputrecordformat which checks the bits
>     in the short and writes the GenericRecord to each file for the
>     corresponding bit. This means no filter to split the records for
>     each file and this is much faster.
>
>     We’re finding a need to do this kind of optimization pretty
>     frequently with flink.
>
>     *From:*Gaurav Khandelwal [mailto:gaurav671989@gmail.com]
>     *Sent:* Saturday, April 29, 2017 4:32 AM
>     *To:* user@flink.apache.org <mailto:user@flink.apache.org>
>     *Subject:* Collector.collect
>
>     Hello
>
>     I am working on RichProcessFunction and I want to emit multiple
>     records at a time. To achieve this, I am currently doing :
>
>     while(condition)
>
>     {
>
>      Collector.collect(new Tuple<>...);
>
>     }
>
>     I was wondering, is this the correct way or there is any other
>     alternative.
>


Mime
View raw message