flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Katsipoulakis, Nikolaos Romanos" <kat...@cs.pitt.edu>
Subject RE: Custom Partitioning and windowing questions/concerns
Date Wed, 25 Jan 2017 17:50:27 GMT
Hello Fabian,

Thank you for your response and there is no need for apologies ☺ . As I mentioned in my
previous email, my wording seemed confusing and it was only expected that you had an incomplete
picture of my goal. Again, thank you for your help and your time.

Moving on to my plan from this point on, I understand that I might have to implement some
custom components myself (I prefer conducting my research on an actual system over regressing
back to an awful simulation). To that end, I thought of implementing my own KeyedStream<T>
implementation that provides the option of using a different StreamPartitioner<T> other
than the HashPartitioner<T>. This CustomKeyedStream<T> will be triggered by a
call to a custom method offered by DataStream<T> (let’s say) customKeyBy(int... fields,
CustomPartitioner<T>) and it will work exactly the same as DataStream<T>.keyBy(int...
fields), but with the only difference that it will receive a custom partitioner instead of
using the default hash partitioner. Do you think that this plan is feasible? I am not completely
sure on whether the windowed key state be affected by the design in any way?


In addition, I will consider your suggestion on extending the AbstractStreamOperator and implementing
the OneInputStreamOperator. It looks like an easier way compared to the one I described above
and I will try to dive into its implementation details.

Again, thank you very much for your help and your constructive comments.

Kind Regards,

Nikos R. Katsipoulakis,
Department of Computer Science
University of Pittsburgh

From: Fabian Hueske [mailto:fhueske@gmail.com]
Sent: Wednesday, January 25, 2017 12:28 PM
To: user@flink.apache.org
Subject: Re: Custom Partitioning and windowing questions/concerns

Hi Nikos,
you are of course right. I forgot that ProcessFunction requires a KeyedStream. Sorry for this
advice.
The problem is that you need need to implement some kind of time-based function that emits
partial counts every 10 seconds.
AFAIK, the DataStream API does not offers built-in operator that gives you this except for
windows and ProcessFunction.
You could try to implement your own operator by extending AbstractStreamOperator and implementing
the OneInputStreamOperator interface.
This is a fairly low-level interface but gives you access to record timestamps and watermarks.
Actually, the DataStream operators are built on this interface as well.
A custom operator is applied by calling dataStream.transform().
Best,
Fabian



2017-01-24 17:18 GMT+01:00 Katsipoulakis, Nikolaos Romanos <katsip@cs.pitt.edu<mailto:katsip@cs.pitt.edu>>:
Hello Fabian,

First, I would like to thank you for your suggestion and the additional information on determinism
and partition policies. As I mentioned on my initial email, I am new to Flink and every additional
piece of advice makes my “learning curve” less steep. In addition, I am aware that you
(and everyone else who follows this thread) might wonder why am I following this unconventional
path of performance partitioning, but, I have to inform you that my use-case’s goal is of
academic nature.

Turning to your suggestion, I took some time and go over version’s 1.2-SNAPSHOT code, and
I read the online documentation on the Process Function API which I found at: https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/process_function.html<https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-master%2Fdev%2Fstream%2Fprocess_function.html&data=01%7C01%7Ckatsip%40cs.pitt.edu%7C590cb15521f144b6ad4e08d44547a7dd%7C9ef9f489e0a04eeb87cc3a526112fd0d%7C1&sdata=G%2FbyjqsSfQq7Rm11yLbPqxoxPwgN7AQm9gRW5vB8vSw%3D&reserved=0>
. From my understanding, the process() transformation can be applied only on a KeyedStream<T>
and not on a DataStream<T>. Therefore, if I wanted to use a custom partition algorithm,
I would have to first make a call to partitionCustom() (DataStream<T> -> DataStream<T>),
followed by a keyBy(…) (DataStream<T> -> KeyedStream<T>), and finally apply
my first pre-aggregation step (i.e., call to process()). Concretely, my code would turn to
something like the following:
// Phase 1: parallel partial sum, with a parallelism of N (N > 1)
DataStream<Tuple3<Long, Integer, Integer> phaseOne = stream
                .partitionCustom(new CustomPartitioner(...)) // or .rebalance() or .shuffle()
                .keyBy(1)
                .process(new CustomProcessFunction(..., Time.seconds(10),...))
                .sum(2).setParallelism(N);

Unfortunately, you can understand that the above would be problematic for two reasons: First,
a call to keyBy() defeats the purpose of a custom partitioner, because stream will be (ultimately)
partitioned based on the keys and not on my CustomPartitioner.selectChannels() method. Second,
using process() does not solve my problem, because the issue with my use-case is to avoid
calling keyBy(). If I could do that, then I might as well call window()and not use the process
API in the first place. To be more precise, if I could use a KeyedStream<T>, then I
could do the following:

// Phase 1: parallel partial sum, with a parallelism of N (N > 1)
DataStream<Tuple3<Long, Integer, Integer> phaseOne = stream
                .partitionCustom(new CustomPartitioner(...))
                .keyBy(1)
                .window(TumblingEventTimeWindows.of(Time.seconds(10))
                .sum(2).setParallelism(N);

Therefore, I don’t think using a Process Function would solve my problem. Am I understanding
your suggestion correctly? If yes, I would be grateful if you could explain to me in more
detail. On top of that, after reading my initial email again, I believe that the intentions
for my use-case were not quite clear. Please, do not hesitate to ask me for any clarifications.

Again, thank you very much for your interest and your time.

Kind Regards,

Nikos R. Katsipoulakis,
Department of Computer Science
University of Pittsburgh

From: Fabian Hueske [mailto:fhueske@gmail.com<mailto:fhueske@gmail.com>]
Sent: Tuesday, January 24, 2017 5:15 AM
To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Custom Partitioning and windowing questions/concerns

Hi Nikos,
Flink's windows require a KeyedStream because they use the keys to manage their internal state
(each in-progress window has some state that needs to be persisted and checkpointed).
Moreover, Flink's event-time window operators return a deterministic result. In your use-case,
the result of the pre-aggregation (phase 1) should not deterministic because it would depend
on the partitioning of the input.
I would suggest to implement the pre-aggregation not with a window but with a ProcessFunction
(available in Flink 1.2-SNAPSHOT which will be release soon).
ProcessFunction allows you to register timers which can be used to emit results every 10 seconds.
Hope this helps,
Fabian


2017-01-23 17:50 GMT+01:00 Katsipoulakis, Nikolaos Romanos <katsip@cs.pitt.edu<mailto:katsip@cs.pitt.edu>>:
Hello all,

Currently, I examine the effects of stream partitioning on performance for simple state-full
scenarios.

My toy application for the rest of my question will be the following: A stream of non-negative
integers, each one annotated with a timestamp, and the goal is to get the top-10 most frequent
non-negative integers on tumbling windows of 10 seconds. In other words, my input is a stream
of tuples with two fields, Tuple2<Long, Integer>(timestamp, key), where key is the non-negative
integer value, and timestamp is used to assign each event to a window. The execution plan
I am considering is to have a first phase (Phase 1), where the stream is partitioned and the
partial aggregations are processed in parallel (set parallelism to N > 1). Afterwards,
the second phase (Phase 2) involves gathering all partial aggregations on a single node (set
parallelism to 1), and calculate the full aggregation for each key, order the keys based on
windowed frequency and outputs the top-10 keys for each window.

As I mentioned earlier, my goal is to compare the performance of different partitioning policies
on this toy application. Initially, I want to compare shuffle-grouping (round-robin) and hash-grouping
and then move on to different partitioning policies by using Flink’s CustomPartitioner API.
After reading Flink’s documentation, I managed to develop the toy application using hash-partitioning.
Below, I present the different parts of my code:

// Phase 0: input setup
DataStream<Tuple3<Long, Integer, Integer>> stream = env.fromCollection(…)
               .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple2<Long,
Integer>>() {
                   @Override
                    public long extractAscendingTimestamp(Tuple2<Long, Integer> event)
{ return event.f0; }
                }).map( (Tuple2<Long, Integer> e) -> new Tuple3<Long, Integer,
Integer>(e.f0, e.f1, 1));

On Phase 0, I collect the input stream, from an in-memory list, define the event timestamp
which will be used for windowing, and extend each event with a value of 1 for calculating
the appearance of each number on every window. Afterwards, for the parallel Phase 1, I use
hash partitioning by first using .keyBy() operation on the key of each tuple (i.e., field
1), followed by a .window() operation, to assign each tuple on a different window, and end
with a .sum(). My code for (parallel) Phase 1 is the following:

// Phase 1: parallel partial sum, with a parallelism of N (N > 1)
DataStream<Tuple3<Long, Integer, Integer> phaseOne = stream.keyBy(1).window(TumblingEventTimeWindows.of(Time.seconds(10)).sum(2).setParallelism(N);

Moving on to Phase 2, to aggregate all partial results of a single window in one operator
for producing the full aggregation, ordering based on frequency, and return the top-10 keys,
I have the following:

// Phase 2: serial full aggregation and ordering, with a parallelism of 1
DataStream<String> phaseTwo = phaseOne
                .windowAll(TumblingEventTimeWindows.of(Time.seconds(10))
                .apply(new AllWindowsFunction<Tuple3<Long, Integer, Integer>, String,
TimeWindow>() {
                    @Override
                    public void apply(TimeWindow window, Iterable<Tuple3<Long, Integer,
Integer>> values, Collector<String> out) throws Exception {
                        ...
                        List<Integer> topTenValues = ...;
                        StringBuilder strBuilder = new StringBuilder();
                        for (Integer t : topTenValues)
                            strBuilder.append(Integer.toString(t) + “,”);
                        out.collect(strBuilder.toString());
                    });

The previous code makes use of hash-partitioning for its parallel phase. From what I understand,
Flink allows the .window() operation only on a KeyedStream. Furthermore, the .customPartition()
method transforms a DataStream to a DataStream (and the same is true for .shuffle() which
round-robins events). Therefore, I am confused on how I can use a shuffle policy with windows.
One Idea that came to me is to provide an irrelevant field on the .keyBy() method, or define
my own KeySelector<IN, KEY> that will simulate shuffle grouping through key generation.
Unfortunately, I have two concerns regarding the previous alternatives: For the keyBy() approach,
I need to control the internal hashing mechanisms, which entails cherry-picking fields on
different workloads and performing an exhaustive search on the behavior of different random
fields (not practical). For the KeySelector<IN, KEY>approach, I need to maintain state
among different calls of getKey(), which (as far as I know) is not offered by the KeySelector<IN,
KEY> interface and I do not want to rely on external state that will lead to additional
overhead. Therefore, my first question is how will I be able to effectively use round-robin
grouping with windows on my toy application?

The bigger point I am trying to address revolves around custom partitioning policies and windows
in general. My understanding is that the benefit of a custom partitioning policy is to have
the ability to control the partitioning process based on a pre-defined set of resources (e.g.,
partitions, task slots etc.). Hence, I am confused on how I would be able to use partitionCustom()
followed by .window() on the (parallel) phase one, to test the performance of different execution
plans (i.e., partitioning policies).

I apologize for the long question, but I believe that I had to provide enough details for
the points/questions I currently have (highlighted with bold). Thank you very much for your
time.

Kind Regards,

Nikos R. Katsipoulakis,
Department of Computer Science
University of Pittsburgh



Mime
View raw message