flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: TIMESTAMP TypeInformation
Date Thu, 27 Oct 2016 14:20:13 GMT
Wouldn't that be orthogonal to adding it to the TypeInfoParser?

2016-10-27 15:22 GMT+02:00 Greg Hogan <code@greghogan.com>:

> Fabian,
>
> Should we instead add this as a registered TypeInfoFactory?
>
> Greg
>
> On Thu, Oct 27, 2016 at 3:55 AM, Fabian Hueske <fhueske@gmail.com> wrote:
>
>> Yes, I think you are right.
>> TypeInfoParser needs to be extended to parse the java.sql.* types into
>> the corresponding TypeInfos.
>>
>> Can you open a JIRA for that?
>>
>> Thanks, Fabian
>>
>> 2016-10-27 9:31 GMT+02:00 Radu Tudoran <radu.tudoran@huawei.com>:
>>
>>> Hi,
>>>
>>>
>>>
>>> I dig meanwhile more through this and I think I found a bug actually.
>>>
>>>
>>>
>>> The scenario that I was trying to describe was something like
>>>
>>> 1.       You have a generic datastream with Tuple (alternatively I
>>> could move to row I guess) and you get the data from whatever stream (e.g.,
>>> kafka, network socket…)
>>>
>>> 2.       In the map/flat map function you parse and instantiate the
>>> tuple generically
>>>
>>> 3.       In the “returns()” function of the map you enforce the types
>>>
>>>
>>>
>>> DataStream<Tuple> = env.socketTextStream(…)
>>>
>>>                                 .map(new mapFunction(){
>>>
>>>                                 Public Tuple map(String value){
>>>
>>>                                                 Tuple out  =
>>> Tuple.getTupleClass(#)
>>>
>>>                                                 …
>>>
>>> out.setField(SqlTimeTypeInfo.TIMESTAMP,0)
>>>
>>> …
>>>
>>>
>>>
>>> }}) .returns(“Tuple#<java.sql.TIMESTAMP,…>”);
>>>
>>>
>>>
>>>
>>>
>>> The problem is that if you rely on the type extraction mechanism called
>>> after the returns to recognize TIMESTAMP of type SqlTimeTypeInfo it will
>>> not happen but instead a GenericType<TIMESTAMP> will be created.
>>>
>>> It looks like the type parsers were not extended to consider this types
>>>
>>>
>>>
>>> Dr. Radu Tudoran
>>>
>>> Senior Research Engineer - Big Data Expert
>>>
>>> IT R&D Division
>>>
>>>
>>>
>>> [image: cid:image007.jpg@01CD52EB.AD060EE0]
>>>
>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>>>
>>> European Research Center
>>>
>>> Riesstrasse 25, 80992 München
>>>
>>>
>>>
>>> E-mail: *radu.tudoran@huawei.com <radu.tudoran@huawei.com>*
>>>
>>> Mobile: +49 15209084330
>>>
>>> Telephone: +49 891588344173
>>>
>>>
>>>
>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
>>> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
>>> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
>>> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
>>> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
>>>
>>> This e-mail and its attachments contain confidential information from
>>> HUAWEI, which is intended only for the person or entity whose address is
>>> listed above. Any use of the information contained herein in any way
>>> (including, but not limited to, total or partial disclosure, reproduction,
>>> or dissemination) by persons other than the intended recipient(s) is
>>> prohibited. If you receive this e-mail in error, please notify the sender
>>> by phone or email immediately and delete it!
>>>
>>>
>>>
>>> *From:* Fabian Hueske [mailto:fhueske@gmail.com]
>>> *Sent:* Wednesday, October 26, 2016 10:11 PM
>>> *To:* user@flink.apache.org
>>> *Subject:* Re: TIMESTAMP TypeInformation
>>>
>>>
>>>
>>> Hi Radu,
>>>
>>> I might not have complete understood your problem, but if you do
>>>
>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>> val tEnv = TableEnvironment.getTableEnvironment(env)
>>>
>>> val ds = env.fromElements( (1, 1L, new Time(1,2,3)) )
>>> val t = ds.toTable(tEnv, 'a, 'b, 'c)
>>>
>>> val results = t
>>>     .select('c + 10.seconds)
>>>
>>> then field 'c will be of type SqlTimeTypeInfo and handled as such.
>>>
>>> Hope this helps,
>>>
>>> Fabian
>>>
>>>
>>>
>>> 2016-10-25 17:32 GMT+02:00 Radu Tudoran <radu.tudoran@huawei.com>:
>>>
>>> Re-hi,
>>>
>>>
>>>
>>> I actually realized that the problem comes from the fact that the
>>> datastream that I am registering does not create properly the types.
>>>
>>>
>>>
>>> I am using something like
>>>
>>>
>>>
>>> DataStream<Tuple> … .returns(“TupleX<,….,java.sql.Timestamp,
>>> java.sql.Time>”)…and I was expecting that these will be converted to
>>> SqlTimeTypeInfo…but it is converted to GenericType. Anythoughts how I could
>>> force the type to be recognize as a SqlTimeType?
>>>
>>>
>>>
>>>
>>>
>>> *From:* Radu Tudoran
>>> *Sent:* Tuesday, October 25, 2016 4:46 PM
>>> *To:* 'user@flink.apache.org'
>>> *Subject:* TIMESTAMP TypeInformation
>>>
>>>
>>>
>>> Hi,
>>>
>>>
>>>
>>> I would like to create a TIMESTAMP type from the data schema. I would
>>> need this to match against the FlinkTypeFactory (toTypeInfo())
>>>
>>>
>>>
>>> *def* toTypeInfo(relDataType: RelDataType): TypeInformation[_] =
>>> relDataType.getSqlTypeName *match* {
>>>
>>>     *case* BOOLEAN => BOOLEAN_TYPE_INFO
>>>
>>>     *case* TINYINT => BYTE_TYPE_INFO
>>>
>>>     *case* SMALLINT => SHORT_TYPE_INFO
>>>
>>>     *case* INTEGER => INT_TYPE_INFO
>>>
>>>     *case* BIGINT => LONG_TYPE_INFO
>>>
>>>     *case* FLOAT => FLOAT_TYPE_INFO
>>>
>>>     *case* DOUBLE => DOUBLE_TYPE_INFO
>>>
>>>     *case* VARCHAR | CHAR => STRING_TYPE_INFO
>>>
>>>     *case* DECIMAL => BIG_DEC_TYPE_INFO
>>>
>>>
>>>
>>>     // date/time types
>>>
>>>     *case* DATE => SqlTimeTypeInfo.DATE
>>>
>>>     *case* TIME => SqlTimeTypeInfo.TIME
>>>
>>>     *case* *TIMESTAMP** => SqlTimeTypeInfo.**TIMESTAMP*
>>>
>>>
>>>
>>> I tried to use create the TypeInformation by calling directly
>>> SqlTimeTypeInfo.TIMESTAMP . However, it seems that
>>> relDataType.getSqlTypeName match is of type ANY instead of being of type
>>> TIMESTAMP.
>>>
>>>
>>>
>>> Any thoughts of how to create the proper TIMESTAMP typeinformation?
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>>
>

Mime
View raw message