flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kamil Dziublinski <kamil.dziublin...@gmail.com>
Subject Re: Key by Task number
Date Tue, 18 Apr 2017 13:43:44 GMT
I am not sure if you really need a keyby, your load will be distributed
among your map function without it.  But could you explain a bit what is
your sink doing?


As for setting parallelism on the consumer remember that you wont have
higher parallelism than number of partitions in your topic.
If you have 240 partitions that's fine, but if you have less than other
subtasks will be idle. Only one task can read from one partition in
parallel.

On Tue, Apr 18, 2017 at 3:38 PM Telco Phone <telco5@yahoo.com> wrote:

>
> I am trying to use the task number as a keyby value to help fan out the
> work load reading from kafka.
>
>
> Given:
>
>        DataStream<SchemaRecord> stream =
>                 env.addSource(new
> FlinkKafkaConsumer010<SchemaRecord>("topicA", schema, properties)
>                 ).setParallelism(240).flatMap(new
> SchemaRecordSplit()).setParallelism(240).
>                         name("TopicA splitter").keyBy("partition",
> "keyByHelper", "schemaId");
>
>         stream.addSink(new CustomMaprFsSink()).name("TopicA
> Sink").setParallelism(240);
>
>
> In the DeserialClass I am trying to get to the
>
> getRuntimeContext().getIndexOfThisSubtask();
>
> Which is only avaliable in the RichSinkFunction
>
>
>
> The above is partition (by hour) , schemaID (avro schemaId) and I would
> like to add the task number so that all 240 readers / writers have
> something to do.
>
> Any ideas ?
>
>
>
>

Mime
View raw message