flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "wind.fly.vip@outlook.com" <wind.fly....@outlook.com>
Subject 疑问:flink sql 不同job消费同一个kafka表(指定了groupId)时输出相同数据?
Date Fri, 22 May 2020 08:21:18 GMT
Hi,all
        使用flink版本1.10.0,在hive catalog下建了映射kafka的表:
        CREATE TABLE x.log.yanfa_log (
    dt TIMESTAMP(3),
    conn_id STRING,
    sequence STRING,
    trace_id STRING,
    span_info STRING,
    service_id STRING,
    msg_id STRING,
    servicename STRING,
    ret_code STRING,
    duration STRING,
    req_body MAP<String,String>,
    res_body MAP<STRING,STRING>,
    extra_info MAP<STRING,STRING>,
    WATERMARK FOR dt AS dt - INTERVAL '60' SECOND
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = '0.11',
    'connector.topic' = 'x-log-yanfa_log',
    'connector.properties.bootstrap.servers' = '******:9092',
    'connector.properties.zookeeper.connect' = '******:2181',
    'connector.properties.group.id' = 'testGroup',
    'connector.startup-mode' = 'group-offsets',
    'update-mode' = 'append',
    'format.type' = 'json',
    'format.fail-on-missing-field' = 'true'
);
消费表x.log.yanfa_log程序如下:
Catalog myCatalog = new HiveCatalog("x", "default",
                "D:\\conf", "1.1.0");
tEnv.registerCatalog("x", myCatalog);
Table rs = tEnv.sqlQuery("select * from x.log.yanfa_log");
tEnv.toAppendStream(rs, Row.class).print();

        然后针对同一个程序启动了2个job,结果都输出了相同的结果。我的疑问是kafka
topic的同一个partition不是只能被group下至多一个consumer消费吗?为什么2个job会输出相同结果呢?
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message