flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rss rss <rssde...@gmail.com>
Subject Kafka and Flink's partitions
Date Thu, 25 Aug 2016 18:21:24 GMT
Hello,

  I want to implement something like a schema of processing which is
presented on following diagram. This is calculation of number of unique
users per specified time with assumption that we have > 100k events per
second and > 100M unique users:



 I have one Kafka's topic of events with a partitioner by hash(userId) %
partitionsNum
https://github.com/rssdev10/flink-kafka-streaming/blob/master/src/main/java/KafkaPartitioner.java.
I have prepared a runnable example
https://github.com/rssdev10/flink-kafka-streaming/blob/master/src/main/java/FlinkStreamingConsumer.java

 And the project is available by
https://github.com/rssdev10/flink-kafka-streaming/ . Also see this page
about how to run data generator and run the test.

 Basic assumption. I need to calculate a number of unique identifiers, so I
need to store them in a memory in Set<String> structure but the size of
this data structure is dozens GB. So I need to partitioning data by
identifier to reduce size and collect only already calculated numbers per
specified time. E.g. every hour.

 Questions:

   1. The logic of Flink is very hidden. Window operator requires keyed
   stream. Does it means that when I'm doing

   eventStream.keyBy(event -> event.partition(partNum));

   with the same partitioner as used for Kafka then Flink saves primary
   partitions? I want to avoid any repartitioning.
   2. Then I'm doing

   WindowedStream<Event, Integer, TimeWindow> uniqUsersWin =
           userIdKeyed.timeWindow(Time.seconds(windowDurationTime));

   DataStream<ProductAggregator> uniqUsers =
uniqUsersWin.trigger(ProcessingTimeTrigger.create())
           .fold(new UniqAggregator(), (FoldFunction<Event,
UniqAggregator>) (accumulator, value) -> {
               accumulator.uniqIds.add(value.getUserId());

               accumulator.registerEvent(value);

               return accumulator;
           })

   does it mean that I have only one partition?
   3. Next, I want to collect partial results of aggregation. I'm using a
   custom trigger
   https://github.com/rssdev10/flink-kafka-streaming/blob/master/src/main/java/CountOrTimeTrigger.java
   which provides firing on collected partial aggregates accordingly to number
   of Kafka's partitions of by emergency time if the number of aggregates is
   not enough. And the following code for aggregation:

   AllWindowedStream<ProductAggregator, TimeWindow> combinedUniqNumStream =
           uniqUsers
                   .timeWindowAll(Time.seconds(emergencyTriggerTimeout))
                   .trigger(PurgingTrigger.of(CountOrTimeTrigger.of(partNum)));

   combinedUniqNumStream
           .fold(new ProductAggregator(),
                   (FoldFunction<ProductAggregator,
ProductAggregator>) (accumulator, value) -> {
               accumulator.value += value.value;

               accumulator.summarize(value);

               return accumulator;
           })

   But sometime I see an incorrect number of unique identifiers probably
   because of skewing of the partial aggregates. This test generates not more
   than 1000 identifiers. It is possible to see it when this test is ran after
   preloading of messages to Kafka.


PS: I found some information at
http://data-artisans.com/kafka-flink-a-practical-how-to/ and
https://www.elastic.co/blog/building-real-time-dashboard-applications-with-apache-flink-elasticsearch-and-kibana
but unfortunately these articles doesn't answer how to build the specified
schema.


Cheers


‚Äč

Mime
View raw message