flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sunfulin <sunfulin0...@163.com>
Subject Flink SQL Count Distinct performance optimization
Date Wed, 08 Jan 2020 07:39:17 GMT
Hi, community,
I'm using Apache Flink SQL to build some of my realtime streaming apps. With one scenario
I'm trying to count(distinct deviceID) over about 100GB data set in realtime, and aggregate
results with sink to ElasticSearch index. I met a severe performance issue when running my
flink job. Wanner get some help from community.


Flink version : 1.8.2
Running on yarn with 4 yarn slots per task manager. My flink task parallelism is set to be
10, which is equal to my kafka source partitions. After running the job, I can observe high
backpressure from the flink dashboard. Any suggestions and kind of help is highly appreciated.



running sql is like the following:
    

INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)

select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as clkCnt  from

(

    SELECT

     aggId,

     pageId,

     statkey,

     COUNT(DISTINCT deviceId) as cnt

     FROM

     (

         SELECT

         'ZL_005' as aggId,

         'ZL_UV_PER_MINUTE' as pageId,

         deviceId,

         ts2Date(recvTime) as statkey

         from

         kafka_zl_etrack_event_stream

     )

     GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)

) as t1

group by aggId, pageId, statkey
















Best
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message