flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shi Quan <qua...@outlook.com>
Subject RE: 回复:RE: flink并行度设置问题
Date Tue, 23 Apr 2019 05:50:59 GMT
减少网络IO传输肯定会降低处理延迟。同时,从你的描述上看,是不是DB写入的性能跟不上?加大sink的并发呢?

其实没有必要自定义keyselecter,flink会根据key值做hash处理,将数据分散到各Operator。





Sent from Mail<https://go.microsoft.com/fwlink/?LinkId=550986> for Windows 10



________________________________
From: 1900 <575209351@qq.com>
Sent: Tuesday, April 23, 2019 11:27:47 AM
To: user-zh
Subject: 回复:RE: flink并行度设置问题

很高兴能解答,chain的文问题也有想过,但暂时还没研究相关文档,是不是并发度都一样的话,就能最大化性能?


数据混洗,是不是就是尽量让整个流程的算子尽量在一台机器,一个taskmanager的一个slot中进行


jobmanager 2G,每个taskmanager4g,每个taskmanager 4个slot,每台机器4核8G
现在的瓶颈就在timewindow聚集数据,不能横向扩展,提高整体性能(并发度设置更大)


其他 具体如前面4,5问题


现贴出大概的代码


总体调用


// 并发度
Constant.PARALLELISM = 8


DataStream<List<OrderInfo>> thisData = lastData.keyBy(new MyKeySelector()).timeWindow(Time.milliseconds(5000))
                .process(new MyWindowFunction())
                .setParallelism(Constant.PARALLELISM);


thisData.addSink(new SinkToDb()).setParallelism(Constant.PARALLELISM);


自定义KeySelector


public class MyKeySelector implements KeySelector<OrderInfo, String> {


    @Override
    public String getKey(OrderInfo value) throws Exception {


        String uniqueId = value.getBusiCode() + value.getOrderNo();


        int mod = Math.abs(uniqueId.hashCode() % Constant.PARALLELISM);


        return String.valueOf(mod);
    }
}




自定义ProcessWindowFunction


public class MyWindowFunction extends ProcessWindowFunction<OrderInfo, List<OrderInfo>,
String, TimeWindow> {


    @Override
    public void process(String key, Context context, Iterable<OrderInfo> elements, Collector<List<OrderInfo>>
out) throws Exception {
        ArrayList<OrderInfo> OrderInfoArrayList = Lists.newArrayList(elements);
        out.collect(OrderInfoArrayList);
    }
}







------------------ 原始邮件 ------------------
发件人: "Shi Quan"<quan.s@outlook.com>;
发送时间: 2019年4月23日(星期二) 上午10:02
收件人: "user-zh@flink.apache.org"<user-zh@flink.apache.org>;

主题: RE: flink并行度设置问题



这个问题好全面,本质上是一个如何提高性能的问题。

首先,我们明确整条流中性能的瓶颈处在哪里?同时也要考虑到数据混洗的问题。



针对你的一些疑问:

  1.  不是并发度一样才好,因为各个operator的复杂度不一样,简单的可以认为复杂的可以给更高的并发度。这个问题说大了会很复杂,会涉及到是不是一个chain,会不会带来额外数据混洗等问题;
  2.  从kafka的模型看,you are right啊,如果source不是瓶颈,就没必要并发度拉这么高;
  3.  kafka source的并发度降低时一种节省资源的方式。建议重点去解决瓶颈问题
  4.  & 5 介意结合代码和数据情况说明么?





Sent from Mail<https://go.microsoft.com/fwlink/?LinkId=550986> for Windows 10



________________________________
From: 1900 <575209351@qq.com>
Sent: Tuesday, April 23, 2019 9:47:08 AM
To: user-zh
Subject: flink并行度设置问题

目前flink用的版本是社区版1.7.2,hadoop版本是2.8.5,采用flink on yarn ha部署,服务启动采用
run a job on yarn


现在简单流程是,从kafka读取数据,然后window时间窗口聚合(假如5秒钟窗口,形成一个list数据,然后输出),然后写入db


1.每个算子的并行度是不是设置一样比较好?
2.读取kafka算子的并发度,理论上上设置于topic的分区一致,假设topic是8个分区,并发度设置8最好
,是不是这样能最大化性能
3.同第二点,现在发现读取kafka的速度真的非常快,往往瓶颈在后面,现在瓶颈就在window时间窗口聚合算子上,
假设调小读取kafka数据的并发度,假如调成4,背压还会体现在读取kafka数据这个算子上,是不是读取kafka只能设置成1了?
窗口的聚合算子不管怎么调大,都会反压到读取kafka数据的算子,请问是哪边有什么问题吗?
4.现在窗口聚合的分组的key是自定义的,选取唯一ID,通过hash对并行度求模,然后取绝对值
拆分开看,只有读取kafka数据,窗口合并两个算子,并发度会很大
加入入DB的算子,并发度就大幅降低,但是拆个窗口聚合的背压一点都没有,背压还是体现在读取kafka数据算子上,请问是什么原因
5.上述的自定义分组key是否有什么问题?从监控页面观察到,现在窗口聚合的slot没有完全使用掉,假设设置8个并行度,实际只有6个子任务在处理数据,
有2个子任务永远没有获取到数据,而且有另外两个子任务数据是其他的两倍

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message