flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Felipe Gutierrez <felipe.o.gutier...@gmail.com>
Subject Re: How can I improve this Flink application for "Distinct Count of elements" in the data stream?
Date Thu, 13 Jun 2019 16:03:50 GMT
humm.. it seems that it is my turn to implement all this stuff using Table
API.
Thanks Rong!

*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*


On Thu, Jun 13, 2019 at 6:00 PM Rong Rong <walterddr@gmail.com> wrote:

> Hi Felipe,
>
> Hequn is right. The problem you are facing is better using TableAPI level
> code instead of dealing with in DataStream. You will have more Flink
> library support to achieve your goal.
>
> In addition, Flink TableAPI also support UserDefineAggregateFunction [1]
> to achieve your hyperLogLog based approximation. In fact the interface is
> similar to the ones in DataStream API [2].
>
> --
> Rong
>
> [1] https://ci.apache.org/projects/flink/flink-docs
> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#aggregation-functions>
> -release-1.8
> <https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/windows.html#aggregatefunction>
> /dev/table/udfs.html#aggregation-functions
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/windows.html#aggregatefunction
>
> On Thu, Jun 13, 2019 at 8:55 AM Felipe Gutierrez <
> felipe.o.gutierrez@gmail.com> wrote:
>
>> Hi Hequn,
>> indeed the ReduceFunction is better than the ProcessWindowFunction. I
>> replaced and could check the improvement performance [1]. Thanks for that!
>> I will try a distinct count with the Table API.
>>
>> The question that I am facing is that I want to use a HyperLogLog on a
>> UDF for DataStream. Thus I will be able to have an approximate distinct
>> count inside a window, like I did here [2]. After having my UDF I want to
>> have my own operator which process this approximation of distinct count. So
>> I am not sure with I can implement my own operator for the TableAPI. Can I?
>>
>> [1]
>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordDistinctCountReduceWindowSocket.java
>> [2]
>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowSocket.java
>>
>> Thanks!
>> Felipe
>>
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> <https://felipeogutierrez.blogspot.com>*
>>
>>
>> On Thu, Jun 13, 2019 at 8:10 AM Hequn Cheng <chenghequn@gmail.com> wrote:
>>
>>> Hi Felipe,
>>>
>>> From your code, I think you want to get the "count distinct" result
>>> instead of the "distinct count". They contain a different meaning.
>>>
>>> To improve the performance, you can replace
>>> your DistinctProcessWindowFunction to a DistinctProcessReduceFunction. A
>>> ReduceFunction can aggregate the elements of a window incrementally, while
>>> for ProcessWindowFunction, elements cannot be incrementally aggregated but
>>> instead need to be buffered internally until the window is considered ready
>>> for processing.
>>>
>>> > Flink does not have a built-in operator which does this computation.
>>> Flink does have built-in operators to solve your problem. You can use
>>> Table API & SQL to solve your problem. The code looks like:
>>>
>>> public static void main(String[] args) throws Exception {
>>>    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>>>
>>>    DataStream ds = env.socketTextStream("localhost", 9000);
>>>    tableEnv.registerDataStream("sourceTable", ds, "line, proctime.proctime");
>>>
>>>    SplitTableFunction splitFunc = new SplitTableFunction();
>>>    tableEnv.registerFunction("splitFunc", splitFunc);
>>>    Table result = tableEnv.scan("sourceTable")
>>>          .joinLateral("splitFunc(line) as word")
>>>          .window(Tumble.over("5.seconds").on("proctime").as("w"))
>>>          .groupBy("w")
>>>          .select("count.distinct(word), collect.distinct(word)");
>>>
>>>    tableEnv.toAppendStream(result, Row.class).print();
>>>    env.execute();
>>> }
>>>
>>> Detail code can be found here[1].
>>>
>>> At the same time, you can perform two-stage window to improve the
>>> performance, i.e., the first window aggregate is used to distinct words and
>>> the second window used to get the final results.
>>>
>>> Document about Table API and SQL can be found here[2][3].
>>>
>>> Best, Hequn
>>>
>>> [1]
>>> https://github.com/hequn8128/flink/commit/b4676a5730cecabe2931b9e628aaebd7729beab2
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html
>>> [3]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sql.html
>>>
>>>
>>> On Wed, Jun 12, 2019 at 9:19 PM Felipe Gutierrez <
>>> felipe.o.gutierrez@gmail.com> wrote:
>>>
>>>> Hi Rong, I implemented my solution using a ProcessingWindow
>>>> with timeWindow and a ReduceFunction with timeWindowAll [1]. So for the
>>>> first window I use parallelism and the second window I use to merge
>>>> everything on the Reducer. I guess it is the best approach to do
>>>> DistinctCount. Would you suggest some improvements?
>>>>
>>>> [1]
>>>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordDistinctCountProcessTimeWindowSocket.java
>>>>
>>>> Thanks!
>>>> *--*
>>>> *-- Felipe Gutierrez*
>>>>
>>>> *-- skype: felipe.o.gutierrez*
>>>> *--* *https://felipeogutierrez.blogspot.com
>>>> <https://felipeogutierrez.blogspot.com>*
>>>>
>>>>
>>>> On Wed, Jun 12, 2019 at 9:27 AM Felipe Gutierrez <
>>>> felipe.o.gutierrez@gmail.com> wrote:
>>>>
>>>>> Hi Rong,
>>>>>
>>>>> thanks for your answer. If I understood well, the option will be to
>>>>> use ProcessFunction [1] since it has the method onTimer(). But not the
>>>>> ProcessWindowFunction [2], because it does not have the method onTimer().
I
>>>>> will need this method to call Collector<OUT> out.collect(...) from
the
>>>>> onTImer() method in order to emit a single value of my Distinct Count
>>>>> function.
>>>>>
>>>>> Is that reasonable what I am saying?
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/index.html?org/apache/flink/streaming/api/datastream/DataStream.html
>>>>> [2]
>>>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/index.html?org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.html
>>>>>
>>>>> Kind Regards,
>>>>> Felipe
>>>>>
>>>>> *--*
>>>>> *-- Felipe Gutierrez*
>>>>>
>>>>> *-- skype: felipe.o.gutierrez*
>>>>> *--* *https://felipeogutierrez.blogspot.com
>>>>> <https://felipeogutierrez.blogspot.com>*
>>>>>
>>>>>
>>>>> On Wed, Jun 12, 2019 at 3:41 AM Rong Rong <walterddr@gmail.com>
wrote:
>>>>>
>>>>>> Hi Felipe,
>>>>>>
>>>>>> there are multiple ways to do DISTINCT COUNT in Table/SQL API. In
>>>>>> fact there's already a thread going on recently [1]
>>>>>> Based on the description you provided, it seems like it might be
a
>>>>>> better API level to use.
>>>>>>
>>>>>> To answer your question,
>>>>>> - You should be able to use other TimeCharacteristic. You might want
>>>>>> to try WindowProcessFunction and see if this fits your use case.
>>>>>> - Not sure I fully understand the question, your keyed by should
be
>>>>>> done on your distinct key (or a combo key) and if you do keyby correctly
>>>>>> then yes all msg with same key is processed by the same TM thread.
>>>>>>
>>>>>> --
>>>>>> Rong
>>>>>>
>>>>>>
>>>>>>
>>>>>> [1]
>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/count-DISTINCT-in-flink-SQL-td28061.html
>>>>>>
>>>>>> On Tue, Jun 11, 2019 at 1:27 AM Felipe Gutierrez <
>>>>>> felipe.o.gutierrez@gmail.com> wrote:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> I have implemented a Flink data stream application to compute
>>>>>>> distinct count of words. Flink does not have a built-in operator
which does
>>>>>>> this computation. I used KeyedProcessFunction and I am saving
the state on
>>>>>>> a ValueState descriptor.
>>>>>>> Could someone check if my implementation is the best way of doing
>>>>>>> it? Here is my solution:
>>>>>>> https://stackoverflow.com/questions/56524962/how-can-i-improve-my-count-distinct-for-data-stream-implementation-in-flink/56539296#56539296
>>>>>>>
>>>>>>> I have some points that I could not understand better:
>>>>>>> - I only could use TimeCharacteristic.IngestionTime.
>>>>>>> - I split the words using "Tuple2<Integer, String>(0, word)",
so I
>>>>>>> will have always the same key (0). As I understand, all the events
will be
>>>>>>> processed on the same TaskManager which will not achieve parallelism
if I
>>>>>>> am in a cluster.
>>>>>>>
>>>>>>> Kind Regards,
>>>>>>> Felipe
>>>>>>> *--*
>>>>>>> *-- Felipe Gutierrez*
>>>>>>>
>>>>>>> *-- skype: felipe.o.gutierrez*
>>>>>>> *--* *https://felipeogutierrez.blogspot.com
>>>>>>> <https://felipeogutierrez.blogspot.com>*
>>>>>>>
>>>>>>

Mime
View raw message