flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From John Mathews <jmathews3...@gmail.com>
Subject Re: Blink Planner Retracting Streams
Date Fri, 19 Jun 2020 00:15:11 GMT
Below is a basic unit test of what we are trying to achieve, but basically,
we are trying to convert from a retracting stream to a
RetractingStreamTableSink, which is easily done with the CRow from the
original flink planner, but seems to be very difficult to do with the blink

The below fails with:
org.apache.flink.table.api.ValidationException: Field types of query result
and registered TableSink default_catalog.default_database.sink2 do not
Query schema: [f0: BOOLEAN, f1: ROW<`f0` STRING, `f1` STRING>]
Sink schema: [key: STRING, id: STRING]

but will succeed if you uncomment the CRow lines of code and run with the
original table planner.

Any thoughts on how we can accomplish this?

public void retractStream() throws Exception {
    EnvironmentSettings settings = EnvironmentSettings.newInstance()
    StreamExecutionEnvironment executionEnvironment =
    StreamTableEnvironment tableEnvironment =
StreamTableEnvironment.create(executionEnvironment, settings);

    Row row1 = new Row(2);
    row1.setField(0, "1");
    row1.setField(1, "1");

    SingleOutputStreamOperator<Row> source =

    tableEnvironment.createTemporaryView("table1", source, "key, id");
    Table outputTable = tableEnvironment.sqlQuery("select id, key from table1");

    RowTypeInfo rowTypeInfo = new
    DataStream<Tuple2<Boolean, Row>> tuple2DataStream =
tableEnvironment.toRetractStream(outputTable, rowTypeInfo);

    // This code block below works on Flink planner but fails on Blink
planner because Blink treats all non-tuples
    // as POJOs
    // SingleOutputStreamOperator<CRow> tuple2DataStream = tableEnvironment
    //         .toRetractStream(outputTable, rowTypeInfo)
    //         .map(value -> new CRow(value.f1, value.f0))
    //         .returns(new CRowTypeInfo(rowTypeInfo));

    tableEnvironment.createTemporaryView("outputTable", tuple2DataStream);
    // Create a retracting table sink
    TableSchema.Builder schemaBuilder = TableSchema.builder();
    schemaBuilder.field("key", DataTypes.STRING());
    schemaBuilder.field("id", DataTypes.STRING());
    TableSchema schema = schemaBuilder.build();
    RetractSink retractTableSink = new RetractSink(new
    tableEnvironment.registerTableSink("sink2", retractTableSink);
    // Wire up the output to the sink

private static class RetractSink implements RetractStreamTableSink<Row> {

    private final AppendStreamTableSink<Row> delegate;

    RetractSink(AppendStreamTableSink<Row> delegate) {
        this.delegate = delegate;

    public TypeInformation<Row> getRecordType() {
        return delegate.getOutputType();

    public TableSchema getTableSchema() {
        return delegate.getTableSchema();

    public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() {
        return new TupleTypeInfo<>(Types.BOOLEAN(), getRecordType());

    public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {

    public DataStreamSink<?>
consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
        DataStream<Row> filteredAndMapped =
                dataStream.flatMap(new TupleMapper()).returns(getRecordType());

        return delegate.consumeDataStream(filteredAndMapped);

    public TableSink<Tuple2<Boolean, Row>> configure(String[]
fieldNames, TypeInformation<?>[] fieldTypes) {
        throw new UnsupportedOperationException();

private static final class TupleMapper implements
FlatMapFunction<Tuple2<Boolean, Row>, Row> {
    public void flatMap(Tuple2<Boolean, Row> value, Collector<Row> out) {
        if (value.f0) {

On Thu, Jun 18, 2020 at 10:21 AM John Mathews <jmathews3773@gmail.com>

> So the difference between Tuple2<Boolean, Row> and CRow is that CRow has a
> special TypeInformation defined here:
> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/types/CRowTypeInfo.scala#L32
> that returns the TypeInfo of the underlying row, whereas the
> TypeInformation for Tuple2 will return type information that contains the
> boolean for the retraction + a nested type info for the row. So all
> downstream operations that rely on seeing just the row type info now break
> for us.
> On Wed, Jun 17, 2020 at 9:23 PM Jark Wu <imjark@gmail.com> wrote:
>> Hi John,
>> Maybe I misunderstand something, but CRow doesn't have the `getSchema()`
>> method. You can getSchema() on the Table, this also works if you convert
>> the table into Tuple2<Boolean, Row>.
>> Actually, there is no big difference between CRow and Tuple2<Boolean,
>> Row>, they both wrap the change flag and the Row.
>> Best,
>> Jark
>> On Thu, 18 Jun 2020 at 06:39, John Mathews <jmathews3773@gmail.com>
>> wrote:
>>> Hello Godfrey,
>>> Thanks for the response!
>>> I think the problem with Tuple2, is that if my understanding is correct
>>> of how CRow worked, when CRow's getSchema() was returned it would return
>>> the underlying schema of the row it contained. Tuple2 doesn't do that, so
>>> it changes/breaks a lot of our downstream code that is relying on the
>>> TableSchema to return the underlying row's schema, and not a Tuple schema.
>>> Any thoughts on that issue?
>>> On Wed, Jun 17, 2020 at 12:16 AM godfrey he <godfreyhe@gmail.com> wrote:
>>>> hi John,
>>>> You can use Tuple2[Boolean, Row] to replace CRow, the
>>>> StreamTableEnvironment#toRetractStream method return DataStream[(Boolean,
>>>> T)].
>>>> the code looks like:
>>>> tEnv.toRetractStream[Row](table).map(new MapFunction[(Boolean, Row), R]
>>>> {
>>>>       override def map(value: (Boolean, Row)): R = ...
>>>>     })
>>>> Bests,
>>>> Godfrey
>>>> John Mathews <jmathews3773@gmail.com> 于2020年6月17日周三 下午12:13写道:
>>>>> Hello,
>>>>> I am working on migrating from the flink table-planner to the new
>>>>> blink one, and one problem I am running into is that it doesn't seem
>>>>> Blink has a concept of a CRow, unlike the original table-planner.
>>>>> I am therefore struggling to figure out how to properly convert a
>>>>> retracting stream to a SingleOutputStreamOperator when using just the
>>>>> planner libraries.
>>>>> E.g. in the old planner I could do something like this:
>>>>> SingleOutputStreamOperator<CRow> stream =
>>>>> tableEnvironment.toRetractStream(table, typeInfo)
>>>>>                     .map(value -> new CRow(value.f1, value.f0);
>>>>> but without the CRow, I'm not sure how to accomplish this.
>>>>> Any suggestions?
>>>>> Thanks!
>>>>> John

View raw message