flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 淘宝龙安 <rentb...@gmail.com>
Subject How can I update data by flink-connector-elasticsearch6_2.11 in join scenario
Date Tue, 05 Nov 2019 05:53:40 GMT
hi, All

I register two table.

user_id  |  varchar |
name     |  varchar |

order_id  |    varchar  |
user_id   |     varchar  |
price       |   varchar    |

then I user flink Table&SQL API to join these table,

"select user_info.user_id as user_id, name, price, order_id from user_info
join user_order on user_order.user_id = user_info.user_id"

finally  I emit join data to elasticsearch cluster.

then I run my flink program and  insert two user to user_info and one order
to user_order.

[image: image.png]

[image: image.png]

In elasticsearch result is

[image: image.png]

my question is

1.    how can i update the price? when I insert another record into
user_order, it not works correctly
update the price from 23.00 to 46.00 (order_id : 111).

[image: image.png]

then i got two records.

[image: image.png]

it seems this program do not defined the unique key fields. But i cann’t
find information in flink docement .
In code source , it says
If the table does not have a key and is append-only, the keys attribute is
null. .

However it not works in join scenario

2.   If the data come from kafka  and syncs from mysql binlog.
      I submit my flink job on  2019-11-05 : 21:00:00.  Then,how can i join
with these users  in mysql but never appeared in  kafka streaming. ( kafka
offset from 2019-11-05 : 21:00:00.)
     (kafkaConsumer011.setStartFromGroupOffsets() not setStartFromEarliest

Thanks .

my code

public class TestTwoStreamJoin {

    void testTowStreamJoin() throws Exception {
        EnvironmentSettings fsSettings =
        StreamExecutionEnvironment fsEnv =
        StreamTableEnvironment fsTableEnv =
StreamTableEnvironment.create(fsEnv, fsSettings);
        DataStream<String> d1 = fsEnv.socketTextStream("localhost", 9000);
        String[] fieldNames = new String[]{"user_id", "name"};
        TypeInformation[] types = new TypeInformation[]{Types.STRING,
        DataStream<Row> t1 = getRows(d1, fieldNames, types);
        DataStream<String> d2 = fsEnv.socketTextStream("localhost", 9001);
        String[] field2 = new String[]{"order_id", "user_id", "price"};
        TypeInformation[] types2 = new TypeInformation[]{Types.STRING,
Types.STRING, Types.STRING};
        DataStream<Row> t2 = getRows(d2, field2, types2);
        fsTableEnv.registerTable("user_info", fsTableEnv.fromDataStream(t1));
        fsTableEnv.registerTable("user_order", fsTableEnv.fromDataStream(t2));
            String joinSql = "select user_info.user_id as user_id,
name, price, order_id from user_info join user_order on
user_order.user_id = user_info.user_id";
        Table t3 = fsTableEnv.sqlQuery(joinSql);
        fsTableEnv.toAppendStream(t3, Row.class).print();
        String[] outputFields = new
        TypeInformation[] outputTypes = new
TypeInformation[]{Types.STRING, Types.STRING,
        fsTableEnv.connect(new Elasticsearch()
                .host("", 9200, "http")
        .withSchema(new Schema().schema(new TableSchema(outputFields,
        .withFormat(new Json().schema(new RowTypeInfo(outputTypes,
        fsTableEnv.sqlUpdate("insert into output " + joinSql);

    public DataStream<Row> getRows(DataStream<String> dataStream,
String[] f, TypeInformation[] t) {
        DataStream<Row> r1 = dataStream.map(new MapFunction<String, Row>() {
            public Row map(String value) throws Exception {
                String[] v = value.split(",");
                Row r  = new Row(v.length);
                for (int i = 0; i<v.length; i++) {
                    r.setField(i, v[i]);
                return r;
        }).returns(new RowTypeInfo(t,f));
        return r1;

View raw message