flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yu Yang <yuyan...@gmail.com>
Subject Re: can flink sql handle udf-generated timestamp field
Date Thu, 06 Jun 2019 06:38:04 GMT
+flink-user

On Wed, Jun 5, 2019 at 9:58 AM Yu Yang <yuyang08@gmail.com> wrote:

> Thanks for the reply!  In flink-table-planner, TimeIndicatorTypeInfo is an
> internal class that cannot be referenced from application. I got "cannot
> find symbol" error when I tried to use it. I have also tried to use "
> SqlTimeTypeInfo.getInfoFor(Timestamp.class) " as return type for my udf
> type info. With that, I got the same  "Window can only be defined over a
> time attribute column" error as before.
>
> On Wed, Jun 5, 2019 at 4:41 AM Lee tinker <lihanmiaodidichuxing@gmail.com>
> wrote:
>
>> Hi Yu Yang:
>> When you want to use time on window, the type of time should be right
>> according to flink.  We can see you return a Types.SQL_TIMESTAMP in your
>> UDF. This type should be TimeIndicatorTypeInfo.PROCTIME_INDICATOR
>> or  TimeIndicatorTypeInfo.ROWTIME_INDICATOR instead of Types.SQL_TIMESTAMP
>> according to your time type(proctime or rowtime). You can try it again by
>> using it.
>>
>> Yu Yang <yuyang08@gmail.com> 于2019年6月5日周三 下午2:57写道:
>>
>>> Hi,
>>>
>>> I am trying to use Flink SQL to do aggregation on a hopping window. In
>>> the data stream, we store the timestamp in long type. So I wrote a UDF
>>> 'FROM_UNIXTIME' to convert long to Timestamp type.
>>>
>>>   public static class TimestampModifier extends ScalarFunction {
>>>     public Timestamp eval(long t) {
>>>       return new Timestamp(t);
>>>     }
>>>     public TypeInformation<?> getResultType(Class<?>[] signature)
{
>>>       return Types.SQL_TIMESTAMP;
>>>     }
>>>   }
>>>
>>> With the above UDF, I wrote the following query, and ran into
>>>  "ProgramInvocationException: The main method caused an error: Window can
>>> only be defined over a time attribute column".
>>> Any suggestions on how to resolve this issue? I am using Flink 1.8 for
>>> this experiment.
>>>
>>> my sql query:
>>>
>>> select  keyid, sum(value)
>>> from (
>>>    select FROM_UNIXTIME(timestampMs) as ts, key.id as keyid, value
>>>    from orders)
>>>  group by HOP(ts, INTERVAL ‘10’ second, INTERVAL ‘30’ second), keyid
>>>
>>> flink exception:
>>>
>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error: Window can only be defined over a time attribute
>>> column.
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>>> at
>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
>>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>>> at
>>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
>>> Caused by: org.apache.flink.table.api.ValidationException: Window can
>>> only be defined over a time attribute column.
>>> at
>>> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:85)
>>> at
>>> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:99)
>>> at
>>> org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:66)
>>> at
>>> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
>>> at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:559)
>>> at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:418)
>>>
>>> Regards,
>>> -Yu
>>>
>>

Mime
View raw message