flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wangsan <wamg...@163.com>
Subject Re: Question about Timestamp in Flink SQL
Date Wed, 29 Nov 2017 02:43:46 GMT
Hi Xincan,

Thanks for your reply. 

The system default timezone is just as what I expected (sun.util.calendar.ZoneInfo[id="Asia/Shanghai",offset=28800000,dstSavings=0,useDaylight=false,transitions=19,lastRule=null]).

I looked into the generated code, and I found the following code snippet:

```
result$20 = org.apache.calcite.runtime.SqlFunctions.internalToTimestamp(result$19);
```

And what `internalToTimestamp` function did is:

```
public static Timestamp internalToTimestamp(long v) {
    return new Timestamp(v - (long)LOCAL_TZ.getOffset(v));
}
```

So, if I give it an event time with unix timestamp 0, then I got the Timestamp(-28800000).
I am confused why `internalToTimestamp` need to subtract the offset?

Best,
wangsan


> On 28 Nov 2017, at 11:32 PM, Xingcan Cui <xingcanc@gmail.com> wrote:
> 
> Hi wangsan,
> 
> in Flink, the ProcessingTime is just implemented by invoking System.currentTimeMillis()
and the long value will be automatically wrapped to a Timestamp with the following statement:
> 
> `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`
> 
> You can check your TimeZone.getDefault() to see if it returns the right TimeZone. Generally,
the returned value should rely on the default TimeZone of your operating system.
> 
> Hope that helps.
> 
> Best,
> Xingcan
> 
> On Tue, Nov 28, 2017 at 9:31 PM, wangsan <wamgsam@163.com <mailto:wamgsam@163.com>>
wrote:
> Hi all,
> 
> While using Timestamp in Flint SQL, how can I set timezone info? Since my current timezone
is GMT+8, and I found the selected processing time is always 8 hours late than current time.
So as extracted event time.
> 
> Here’s my simplified code:
> val senv = StreamExecutionEnvironment.getExecutionEnvironment
> senv.setParallelism(1)
> senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
> 
> val sTableEnv = TableEnvironment.getTableEnvironment(senv)
> println(s"current time: ${new SimpleDateFormat("yyyy.MM.dd HH:mm:ss.SSS", Locale.CHINA).format(new
Date())}")
> 
> val stream: DataStream[(String, String, String)] = senv.socketTextStream("localhost",
9999).map(line => (line, line, line))
> val table = sTableEnv.fromDataStream(stream, 'col1, 'col2, 'col3, 't.proctime)
> sTableEnv.registerTable("foo", table)
> val result = sTableEnv.sql("select * from foo")
> result.printSchema()
> result.toAppendStream[Row].print()
> 
> senv.execute("foo")
> And here’s the result:
> 
> <PastedGraphic-1.png>
> 
> Best,
> wangsan
> 


Mime
View raw message