flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Xingcan Cui <xingc...@gmail.com>
Subject Re: Question about Timestamp in Flink SQL
Date Tue, 28 Nov 2017 15:32:13 GMT
Hi wangsan,

in Flink, the ProcessingTime is just implemented by invoking
System.currentTimeMillis() and the long value will be automatically wrapped
to a Timestamp with the following statement:

`new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`

You can check your TimeZone.getDefault() to see if it returns the right
TimeZone. Generally, the returned value should rely on the default TimeZone
of your operating system.

Hope that helps.

Best,
Xingcan

On Tue, Nov 28, 2017 at 9:31 PM, wangsan <wamgsam@163.com> wrote:

> Hi all,
>
> While using Timestamp in Flint SQL, how can I set timezone info? Since my
> current timezone is *GMT+8*, and I found the selected processing time is
> always *8 hours* late than current time. So as extracted event time.
>
> Here’s my simplified code:
>
> val senv = StreamExecutionEnvironment.getExecutionEnvironment
> senv.setParallelism(1)
> senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>
> val sTableEnv = TableEnvironment.getTableEnvironment(senv)
> println(s"current time: ${new SimpleDateFormat("yyyy.MM.dd HH:mm:ss.SSS", Locale.CHINA).format(new
Date())}")
>
> val stream: DataStream[(String, String, String)] = senv.socketTextStream("localhost",
9999).map(line => (line, line, line))
> val table = sTableEnv.fromDataStream(stream, 'col1, 'col2, 'col3, 't.proctime)
> sTableEnv.registerTable("foo", table)
> val result = sTableEnv.sql("select * from foo")
> result.printSchema()
> result.toAppendStream[Row].print()
>
> senv.execute("foo")
>
> And here’s the result:
>
>
> Best,
> wangsan
>

Mime
View raw message