flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robert Metzger <rmetz...@apache.org>
Subject Re: [Discussion] Query Regarding Operator chaining
Date Thu, 14 Jul 2016 15:01:11 GMT
Aljoscha is right. Multiple consumers in the same consumer group can not
read from the same partition.
You'll need to create a Kafka topic with more partitions to have higher
parallelism.

On Wed, Jul 6, 2016 at 10:45 AM, Aljoscha Krettek <aljoscha@apache.org>
wrote:

> Hi,
> unfortunately the reading of one Kafka partition cannot be split among
> several parallel instances of the Kafka source. So if you have only 2
> partitions your reading parallelism is limited to that. You are right that
> this can lead to bad performance and underutilization. The only solution I
> see right now is to have more partitions in Kafka so that more readers can
> read in parallel.
>
> +Robert Adding Robert directly because he might have something more to say
> about this.
>
> Cheers,
> Aljoscha
>
> On Tue, 5 Jul 2016 at 15:48 Vinay Patil <vinay18.patil@gmail.com> wrote:
>
>> Hi,
>>
>> The re-balance actually distributes it to all the task managers, and now
>> all TM's are getting utilized, You were right , I am seeing two
>> boxes(Tasks) now.
>>
>> I have one question regarding the task slots :
>>
>> For the source the parallelism is set to 56, now when we see on the UI and
>> click on source sub-task , I see 56 entries , out of which only two are
>> getting the data from Kafka (this may be because I have two kafka
>> partitions)
>>
>> The 56 entries that I am seeing for a sub-task on UI are the total task
>> slots of all TM's, right ?
>>
>> If yes, only two slots are getting utilized, how do I ensure enough task
>> slots are getting utilized at the source ? I have 7 task managers (8 cores
>> per TM), so if only 1 core each of two task manager is performing the
>> consume operation, wouldn't it hamper the performance.
>>
>> Even if two Task managers are utilized , all 16 slots should have been
>> used
>> , right ?
>>
>> For the other sub-task, for all 56 entries I am seeing bytes received.
>> (this may be because of applying rebalance after the source)
>>
>> P.S: I am reading over million records from Kafka , so need to utilize
>> enough resources [Performance is the key here].
>>
>>
>> Regards,
>> Vinay Patil
>>
>> On Mon, Jul 4, 2016 at 8:55 PM, Vinay Patil <vinay18.patil@gmail.com>
>> wrote:
>>
>> > Thanks a lot guys, this helps to understand better
>> >
>> > Regards,
>> > Vinay Patil
>> >
>> > On Mon, Jul 4, 2016 at 8:43 PM, Stephan Ewen <sewen@apache.org> wrote:
>> >
>> >> Just to be sure: Each *subtask* has one thread - so for each task,
>> there
>> >> are as many parallel threads (distributed across nodes) as your
>> >> parallelism
>> >> indicates.
>> >>
>> >> For most cases, having long chains and then a higher parallelism is a
>> good
>> >> choice.
>> >> Cases where individual functions (MapFunction, etc) do something very
>> CPU
>> >> intensive are cases where you may want to not chain them, so they get a
>> >> separate thread.
>> >>
>> >> If you see all tasks in one box in the UI, it probably means you have
>> only
>> >> "Filter" and "Map" as a function? In that case it is fine to have just
>> one
>> >> box (=Task) in the UI. The box still has parallelism via subtasks.
>> >>
>> >> If you insert a "rebalance()" between the Kafka Source and the
>> >> Map/Filter/etc it makes sure that the data distribution in the
>> >> Map/Filter/etc operators has best utilization independent of how the
>> data
>> >> was partitioned in Kafka.
>> >> You should then also see two boxes in the UI - one for the Kafka
>> Source,
>> >> one for the actual processing.
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> On Mon, Jul 4, 2016 at 5:00 PM, Aljoscha Krettek <aljoscha@apache.org>
>> >> wrote:
>> >>
>> >> > Hi,
>> >> > chaining is useful to minimize communication overhead. But in your
>> case
>> >> you
>> >> > might benefit more from having good cluster utilization. There seems
>> to
>> >> be
>> >> > a tradeoff. Maybe you can run some easy tests to see how it behaves
>> for
>> >> > you.
>> >> >
>> >> > Cheers,
>> >> > Aljoscha
>> >> >
>> >> > On Mon, 4 Jul 2016 at 16:28 Vinay Patil <vinay18.patil@gmail.com>
>> >> wrote:
>> >> >
>> >> > > Thanks,
>> >> > >
>> >> > > so is operator chaining useful in terms of utilizing the resources
>> or
>> >> we
>> >> > > should keep the chaining to minimal use, say 3-4 operators and
>> disable
>> >> > > chaining ?
>> >> > > I am worried because I am seeing all the operators in one box
on
>> flink
>> >> > UI.
>> >> > >
>> >> > >
>> >> > > Regards,
>> >> > > Vinay Patil
>> >> > >
>> >> > > On Mon, Jul 4, 2016 at 7:13 PM, Aljoscha Krettek <
>> aljoscha@apache.org
>> >> >
>> >> > > wrote:
>> >> > >
>> >> > > > Hi,
>> >> > > > this is true, yes. If the number of Kafka partitions is less
than
>> >> the
>> >> > > > parallelism then some of the sources might not be utilized.
If
>> you
>> >> > > insert a
>> >> > > > rebalance after the sources you should be able to utilize
all the
>> >> > > > downstream operations equally.
>> >> > > >
>> >> > > > Cheers,
>> >> > > > Aljoscha
>> >> > > >
>> >> > > > On Mon, 4 Jul 2016 at 11:13 Vinay Patil <vinay18.patil@gmail.com
>> >
>> >> > wrote:
>> >> > > >
>> >> > > > > Just an update, the task will be executed by multiple
threads
>> , my
>> >> > bad
>> >> > > I
>> >> > > > > asked the wrong way.
>> >> > > > > Can you please clarify other things.
>> >> > > > >
>> >> > > > > Out of 8 node only 3 of them are getting utilized, reading
the
>> >> data
>> >> > > from
>> >> > > > > Kafka , does it mean that the Kafka partitions are set
to less
>> >> > number ?
>> >> > > > >
>> >> > > > > What if we use rescale or rebalance since it evenly
>> distributes ,
>> >> > would
>> >> > > > > that ensure maximum use of resources ?
>> >> > > > >
>> >> > > > > Regards,
>> >> > > > > Vinay Patil
>> >> > > > >
>> >> > > > > On Fri, Jul 1, 2016 at 11:09 PM, Vinay Patil <
>> >> > vinay18.patil@gmail.com>
>> >> > > > > wrote:
>> >> > > > >
>> >> > > > > > Hi,
>> >> > > > > >
>> >> > > > > > According to the documentation :
>> >> > > > > > *"**Each task is executed by one thread ,**Chaining
operators
>> >> > > together
>> >> > > > > > into tasks is a useful optimization: it reduces
the overhead
>> of
>> >> > > > > > thread-to-thread handover and buffering, and increases
>> overall
>> >> > > > throughput
>> >> > > > > > while decreasing latency"*
>> >> > > > > > So does it mean that the single box (refer below
mails)
>> >> represent
>> >> > it
>> >> > > as
>> >> > > > > a *single
>> >> > > > > > task* and  the task will be executed by single
thread only ?
>> >> > > > > >
>> >> > > > > > I am having 8 node cluster (parallelism set to
56), so what
>> is
>> >> the
>> >> > > > > correct
>> >> > > > > > way to achieve maximum CPU utilization and parallelism
? Does
>> >> > > complete
>> >> > > > > > stream chaining into a single box achieve maximum
>> parallelism ?
>> >> > > > > >
>> >> > > > > > The data we are processing is huge volume of data
(60,000
>> >> records
>> >> > per
>> >> > > > > > second), so wanted to be sure what we can correct
to achieve
>> >> better
>> >> > > > > > results.
>> >> > > > > >
>> >> > > > > > Regards,
>> >> > > > > > Vinay Patil
>> >> > > > > >
>> >> > > > > >
>> >> > > > > > On Fri, Jul 1, 2016 at 9:23 PM, Aljoscha Krettek
<
>> >> > > aljoscha@apache.org>
>> >> > > > > > wrote:
>> >> > > > > >
>> >> > > > > >> Hi,
>> >> > > > > >> yes, the window operator is stateful, which
means that it
>> will
>> >> > pick
>> >> > > up
>> >> > > > > >> where it left in case of a failure and restore.
>> >> > > > > >>
>> >> > > > > >> You're right about the graph, chained operators
are shown as
>> >> one
>> >> > > box.
>> >> > > > > >>
>> >> > > > > >> Cheers,
>> >> > > > > >> Aljoscha
>> >> > > > > >>
>> >> > > > > >> On Fri, 1 Jul 2016 at 04:52 Vinay Patil <
>> >> vinay18.patil@gmail.com>
>> >> > > > > wrote:
>> >> > > > > >>
>> >> > > > > >> > Hi,
>> >> > > > > >> >
>> >> > > > > >> > Just watched the video on Robust Stream
Processing .
>> >> > > > > >> > So when we say Window is a stateful operator
, does it
>> mean
>> >> that
>> >> > > > even
>> >> > > > > if
>> >> > > > > >> > the task manager doing the window operation
fails,  will
>> it
>> >> pick
>> >> > > up
>> >> > > > > from
>> >> > > > > >> > the state left earlier when it comes up
? (Have not read
>> >> more on
>> >> > > > state
>> >> > > > > >> for
>> >> > > > > >> > now)
>> >> > > > > >> >
>> >> > > > > >> >
>> >> > > > > >> > Also in one of our project when we deploy
on cluster and
>> >> check
>> >> > the
>> >> > > > Job
>> >> > > > > >> > Graph , everything is shown in one box
, why this happens
>> ?
>> >> Is
>> >> > it
>> >> > > > > >> because
>> >> > > > > >> > of chaining of streams ?
>> >> > > > > >> > So the box here represent the function
flow, right ?
>> >> > > > > >> >
>> >> > > > > >> >
>> >> > > > > >> >
>> >> > > > > >> > Regards,
>> >> > > > > >> > Vinay Patil
>> >> > > > > >> >
>> >> > > > > >>
>> >> > > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>> >
>> >
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message