flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Caizhi Weng <tsreape...@gmail.com>
Subject Re: How to create Row with RowTypeInfo
Date Mon, 15 Jul 2019 06:34:20 GMT
I forget to add the user mailing list in the response. I now add user
mailing list to the response in case other users might want to solve this
problem too...

Soheil Pourbafrani <soheil.ir08@gmail.com> 于2019年7月15日周一 上午2:56写道:

> Great!
> I got it
>
> Thanks
>
> On Sun, Jul 14, 2019 at 8:26 PM Caizhi Weng <tsreaper96@gmail.com> wrote:
>
>> Hi Soheil,
>>
>> From what I understand, as `Row` is not strongly typed, and Flink
>> currently does not specially treat the `RowTypeInfo` when extracting return
>> types of map functions, one can't simply provide a map function to
>> `dataset.map()` and expect it to return a strongly typed `RowTypeInfo`.
>>
>> But if your mapping function implements the `ResultTypeQueryable`
>> interface, Flink can directly get the result type from its
>> `getProducedType` method (see `getUnaryOperatorReturnType` in
>> `TypeExtractor`). So you can write the following code to hint Flink about
>> the return type of your mapping function.
>>
>> @Test
>> public void myTest() {
>>    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>>    BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env, config());
>>
>>    DataSet<Row> dataSet = env.fromElements(
>>       Row.of(1, 1L, "a"),
>>       Row.of(2, 2L, "b"),
>>       Row.of(3, 3L, "c")
>>    );
>>    System.out.println(dataSet.getType());
>>
>>    DataSet<Row> ds = dataSet.map(row -> Row.of(row.getField(0), row.getField(1)));
>>    System.out.println(ds.getType());
>>
>>    DataSet<Row> ds2 = dataSet.map(new MyMappingFunc(dataSet));
>>    System.out.println(ds2.getType());
>> }
>>
>> private class MyMappingFunc implements MapFunction<Row, Row>, ResultTypeQueryable<Row>
{
>>
>>    private RowTypeInfo producedType;
>>
>>    public MyMappingFunc(DataSet<Row> dataSet) {
>>       RowTypeInfo ti = (RowTypeInfo) dataSet.getType();
>>       producedType = new RowTypeInfo(ti.getTypeAt(0), ti.getTypeAt(1));
>>    }
>>
>>    @Override
>>    public TypeInformation<Row> getProducedType() {
>>       return producedType;
>>    }
>>
>>    @Override
>>    public Row map(Row value) throws Exception {
>>       return Row.of(value.getField(0), value.getField(1));
>>    }
>> }
>>
>> You can see that, the type of `ds2` will be the desired Row(f0: Integer,
>> f1: Long).
>>
>> This is the method what I can currently think of. Maybe there exists a
>> simpler method, I'll ask the other Flink developers tomorrow. Maybe the
>> developers would like to specially treat the `RowTypeInfo` when deriving
>> return types of the mapping functions in the future.
>>
>> Soheil Pourbafrani <soheil.ir08@gmail.com> 于2019年7月14日周日 下午8:22写道:
>>
>>> Hi Caizhi,
>>> thanks for your reply.
>>>
>>> Maybe I didn't elaborate enough. I'm familiar with that method, maybe.
>>> My problem is supposed you have a dataset with the schema (id: Integer,
>>> name: String, register_date: Date). If I try to map it like the following:
>>>
>>> DataSet<Row> res = dataset.map(row -> {
>>>             Row temp = Row.of(
>>>                     row.getField(0),
>>>                     row.getField(1)
>>>             );
>>>
>>>             return temp;
>>>         });
>>>
>>> then the res type info will be :
>>>
>>> System.out.println(res.getType());
>>>
>>> GenericType<org.apache.flink.types.Row>
>>>
>>> But I want the typeinfo to show
>>> Row(id: Integer, name: String)
>>>
>>> Is there any way to use the schema of the dataset in map operation to
>>> generate a new dataset?
>>>
>>> On Sun, Jul 14, 2019 at 6:44 AM Caizhi Weng <tsreaper96@gmail.com>
>>> wrote:
>>>
>>>> Hi Soheil,
>>>>
>>>> There is the `toDataSet(Table, TypeInformation<?>)` method in
>>>> `BatchTableEnvironment` to which you can pass your optional RowTypeInfo.
>>>> Note that the TypeInformation you pass to it must match the field types of
>>>> the table.
>>>>
>>>> Soheil Pourbafrani <soheil.ir08@gmail.com> 于2019年7月14日周日
上午5:03写道:
>>>>
>>>>> Hi
>>>>>
>>>>> Creating a new DataSet of type Row, how can I the RowTypeInfo of the
>>>>> row?
>>>>>
>>>>> For example when I create a new dataset like the following:
>>>>>
>>>>> Row row = Row.of(1, new Timestamp(1), new Date(1));
>>>>> System.out.println(env.fromElements(row).getType());
>>>>>
>>>>> it results in:
>>>>> Row(f0: Integer, f1: Timestamp, f2: Date)
>>>>> While Flink automatically use f0, f1, ... for labeling I want to
>>>>> specify the label name of each element
>>>>>
>>>>> For example, when I create a new DataSet from a Table, it contains the
>>>>> Table schema too:
>>>>>
>>>>> DataSet res = tEnv.toDataSet(table, Row.class);
>>>>>
>>>>> Row(id: Integer, time: Timestamp, name: String, age: Integer, grade:
>>>>> Double)
>>>>>
>>>>> So is there any way to create a new DataSet<Row> with optional
>>>>> RowTypeInfo?
>>>>>
>>>>

Mime
View raw message