Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 5FC9C200C5A for ; Tue, 18 Apr 2017 16:16:38 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 5E5DA160BA1; Tue, 18 Apr 2017 14:16:38 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 803DB160B90 for ; Tue, 18 Apr 2017 16:16:37 +0200 (CEST) Received: (qmail 22372 invoked by uid 500); 18 Apr 2017 14:16:36 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 22363 invoked by uid 99); 18 Apr 2017 14:16:36 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 18 Apr 2017 14:16:36 +0000 Received: from [192.168.178.53] (ipservice-092-214-155-234.092.214.pools.vodafone-ip.de [92.214.155.234]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id E5D271A0372 for ; Tue, 18 Apr 2017 14:16:35 +0000 (UTC) Subject: Re: Key by Task number To: user@flink.apache.org References: <1831168830.3306870.1492522696605.ref@mail.yahoo.com> <1831168830.3306870.1492522696605@mail.yahoo.com> From: Chesnay Schepler Message-ID: <57739b99-b068-0417-6d91-da0d09f534b0@apache.org> Date: Tue, 18 Apr 2017 16:16:37 +0200 User-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64; rv:52.0) Gecko/20100101 Thunderbird/52.0.1 MIME-Version: 1.0 In-Reply-To: Content-Type: multipart/alternative; boundary="------------97F6B949E523FA49028685D3" Content-Language: en-US archived-at: Tue, 18 Apr 2017 14:16:38 -0000 This is a multi-part message in MIME format. --------------97F6B949E523FA49028685D3 Content-Type: text/plain; charset=utf-8; format=flowed Content-Transfer-Encoding: 7bit If the number of combinations between partition and schemaID is limited then the subtask index could actually improve the distribution of values. In any case, the easiest way to do this is to add a RichMapFunction after the flatMap, or modify the flatMap, to also include the subtask index. Typically this would be done by creating a Tuple2 containing the index and value. On 18.04.2017 15:43, Kamil Dziublinski wrote: > I am not sure if you really need a keyby, your load will be > distributed among your map function without it. But could you explain > a bit what is your sink doing? > > > As for setting parallelism on the consumer remember that you wont have > higher parallelism than number of partitions in your topic. > If you have 240 partitions that's fine, but if you have less than > other subtasks will be idle. Only one task can read from one partition > in parallel. > > On Tue, Apr 18, 2017 at 3:38 PM Telco Phone > wrote: > > > I am trying to use the task number as a keyby value to help fan > out the work load reading from kafka. > > > Given: > > DataStream stream = > env.addSource(new > FlinkKafkaConsumer010("topicA", schema, properties) > ).setParallelism(240).flatMap(new > SchemaRecordSplit()).setParallelism(240). > name("TopicA splitter").keyBy("partition", > "keyByHelper", "schemaId"); > > stream.addSink(new CustomMaprFsSink()).name("TopicA > Sink").setParallelism(240); > > > In the DeserialClass I am trying to get to the > > getRuntimeContext().getIndexOfThisSubtask(); > > Which is only avaliable in the RichSinkFunction > > > > The above is partition (by hour) , schemaID (avro schemaId) and I > would like to add the task number so that all 240 readers / > writers have something to do. > > Any ideas ? > > > --------------97F6B949E523FA49028685D3 Content-Type: text/html; charset=utf-8 Content-Transfer-Encoding: 8bit
If the number of combinations between partition and schemaID is limited then the subtask index could actually improve the distribution of values.

In any case, the easiest way to do this is to add a RichMapFunction after the flatMap, or modify the flatMap, to also include the subtask index.
Typically this would be done by creating a Tuple2 containing the index and value.

On 18.04.2017 15:43, Kamil Dziublinski wrote:
I am not sure if you really need a keyby, your load will be distributed among your map function without it.  But could you explain a bit what is your sink doing?


As for setting parallelism on the consumer remember that you wont have higher parallelism than number of partitions in your topic.
If you have 240 partitions that's fine, but if you have less than other subtasks will be idle. Only one task can read from one partition in parallel.

On Tue, Apr 18, 2017 at 3:38 PM Telco Phone <telco5@yahoo.com> wrote:

I am trying to use the task number as a keyby value to help fan out the work load reading from kafka.


Given:

       DataStream<SchemaRecord> stream =
                env.addSource(new FlinkKafkaConsumer010<SchemaRecord>("topicA", schema, properties)
                ).setParallelism(240).flatMap(new SchemaRecordSplit()).setParallelism(240).
                        name("TopicA splitter").keyBy("partition", "keyByHelper", "schemaId");

        stream.addSink(new CustomMaprFsSink()).name("TopicA Sink").setParallelism(240);


In the DeserialClass I am trying to get to the 

getRuntimeContext().getIndexOfThisSubtask(); 

Which is only avaliable in the RichSinkFunction



The above is partition (by hour) , schemaID (avro schemaId) and I would like to add the task number so that all 240 readers / writers have something to do.

Any ideas ?




--------------97F6B949E523FA49028685D3--