flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Greg Hogan <c...@greghogan.com>
Subject Re: TIMESTAMP TypeInformation
Date Thu, 27 Oct 2016 13:22:34 GMT
Fabian,

Should we instead add this as a registered TypeInfoFactory?

Greg

On Thu, Oct 27, 2016 at 3:55 AM, Fabian Hueske <fhueske@gmail.com> wrote:

> Yes, I think you are right.
> TypeInfoParser needs to be extended to parse the java.sql.* types into the
> corresponding TypeInfos.
>
> Can you open a JIRA for that?
>
> Thanks, Fabian
>
> 2016-10-27 9:31 GMT+02:00 Radu Tudoran <radu.tudoran@huawei.com>:
>
>> Hi,
>>
>>
>>
>> I dig meanwhile more through this and I think I found a bug actually.
>>
>>
>>
>> The scenario that I was trying to describe was something like
>>
>> 1.       You have a generic datastream with Tuple (alternatively I could
>> move to row I guess) and you get the data from whatever stream (e.g.,
>> kafka, network socket…)
>>
>> 2.       In the map/flat map function you parse and instantiate the
>> tuple generically
>>
>> 3.       In the “returns()” function of the map you enforce the types
>>
>>
>>
>> DataStream<Tuple> = env.socketTextStream(…)
>>
>>                                 .map(new mapFunction(){
>>
>>                                 Public Tuple map(String value){
>>
>>                                                 Tuple out  =
>> Tuple.getTupleClass(#)
>>
>>                                                 …
>>
>> out.setField(SqlTimeTypeInfo.TIMESTAMP,0)
>>
>> …
>>
>>
>>
>> }}) .returns(“Tuple#<java.sql.TIMESTAMP,…>”);
>>
>>
>>
>>
>>
>> The problem is that if you rely on the type extraction mechanism called
>> after the returns to recognize TIMESTAMP of type SqlTimeTypeInfo it will
>> not happen but instead a GenericType<TIMESTAMP> will be created.
>>
>> It looks like the type parsers were not extended to consider this types
>>
>>
>>
>> Dr. Radu Tudoran
>>
>> Senior Research Engineer - Big Data Expert
>>
>> IT R&D Division
>>
>>
>>
>> [image: cid:image007.jpg@01CD52EB.AD060EE0]
>>
>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>>
>> European Research Center
>>
>> Riesstrasse 25, 80992 München
>>
>>
>>
>> E-mail: *radu.tudoran@huawei.com <radu.tudoran@huawei.com>*
>>
>> Mobile: +49 15209084330
>>
>> Telephone: +49 891588344173
>>
>>
>>
>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
>> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
>> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
>> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
>> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
>>
>> This e-mail and its attachments contain confidential information from
>> HUAWEI, which is intended only for the person or entity whose address is
>> listed above. Any use of the information contained herein in any way
>> (including, but not limited to, total or partial disclosure, reproduction,
>> or dissemination) by persons other than the intended recipient(s) is
>> prohibited. If you receive this e-mail in error, please notify the sender
>> by phone or email immediately and delete it!
>>
>>
>>
>> *From:* Fabian Hueske [mailto:fhueske@gmail.com]
>> *Sent:* Wednesday, October 26, 2016 10:11 PM
>> *To:* user@flink.apache.org
>> *Subject:* Re: TIMESTAMP TypeInformation
>>
>>
>>
>> Hi Radu,
>>
>> I might not have complete understood your problem, but if you do
>>
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> val tEnv = TableEnvironment.getTableEnvironment(env)
>>
>> val ds = env.fromElements( (1, 1L, new Time(1,2,3)) )
>> val t = ds.toTable(tEnv, 'a, 'b, 'c)
>>
>> val results = t
>>     .select('c + 10.seconds)
>>
>> then field 'c will be of type SqlTimeTypeInfo and handled as such.
>>
>> Hope this helps,
>>
>> Fabian
>>
>>
>>
>> 2016-10-25 17:32 GMT+02:00 Radu Tudoran <radu.tudoran@huawei.com>:
>>
>> Re-hi,
>>
>>
>>
>> I actually realized that the problem comes from the fact that the
>> datastream that I am registering does not create properly the types.
>>
>>
>>
>> I am using something like
>>
>>
>>
>> DataStream<Tuple> … .returns(“TupleX<,….,java.sql.Timestamp,
>> java.sql.Time>”)…and I was expecting that these will be converted to
>> SqlTimeTypeInfo…but it is converted to GenericType. Anythoughts how I could
>> force the type to be recognize as a SqlTimeType?
>>
>>
>>
>>
>>
>> *From:* Radu Tudoran
>> *Sent:* Tuesday, October 25, 2016 4:46 PM
>> *To:* 'user@flink.apache.org'
>> *Subject:* TIMESTAMP TypeInformation
>>
>>
>>
>> Hi,
>>
>>
>>
>> I would like to create a TIMESTAMP type from the data schema. I would
>> need this to match against the FlinkTypeFactory (toTypeInfo())
>>
>>
>>
>> *def* toTypeInfo(relDataType: RelDataType): TypeInformation[_] =
>> relDataType.getSqlTypeName *match* {
>>
>>     *case* BOOLEAN => BOOLEAN_TYPE_INFO
>>
>>     *case* TINYINT => BYTE_TYPE_INFO
>>
>>     *case* SMALLINT => SHORT_TYPE_INFO
>>
>>     *case* INTEGER => INT_TYPE_INFO
>>
>>     *case* BIGINT => LONG_TYPE_INFO
>>
>>     *case* FLOAT => FLOAT_TYPE_INFO
>>
>>     *case* DOUBLE => DOUBLE_TYPE_INFO
>>
>>     *case* VARCHAR | CHAR => STRING_TYPE_INFO
>>
>>     *case* DECIMAL => BIG_DEC_TYPE_INFO
>>
>>
>>
>>     // date/time types
>>
>>     *case* DATE => SqlTimeTypeInfo.DATE
>>
>>     *case* TIME => SqlTimeTypeInfo.TIME
>>
>>     *case* *TIMESTAMP** => SqlTimeTypeInfo.**TIMESTAMP*
>>
>>
>>
>> I tried to use create the TypeInformation by calling directly
>> SqlTimeTypeInfo.TIMESTAMP . However, it seems that
>> relDataType.getSqlTypeName match is of type ANY instead of being of type
>> TIMESTAMP.
>>
>>
>>
>> Any thoughts of how to create the proper TIMESTAMP typeinformation?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>
>

Mime
View raw message