flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 1101300123 <hdxg1101300...@163.com>
Subject 1.10 使用 flinkSQL 的row_number()函数实现top1 出现数组越界,求助社区大佬
Date Tue, 12 May 2020 01:25:18 GMT


我的SQL语句如下,部分字段省略
select
   a.contact_id,
   ...
   a.code_contact_channel
  from
   (
   select
    contact_id,
    service_no,
    ...
    code_contact_channel,
    row_number() over(partition by contact_id,service_no order by operate_time desc) as rn
   from
    table1
   )a
  join (
   select
    contact_id ,
    mobile_no,
    ...
    row_number() over(partition by contact_id,mobile_no order by create_time desc) as rn
   from
    table2 )b
    on
    a.contact_id = b.contact_id
    and a.service_no = b.mobile_no
  where
   a.rn = 1
   and b.rn = 1 ;
程序部署在yarn上运行几个小时后就会出现错误日志显示如下:


[flink-akka.actor.default-dispatcher-8695] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1],
partitionBy=[contact_id, service_no], orderBy=[operate_time DESC], select=[operate_time, contact_id.....])
-> Calc(select=[contact_id, start_time, contact_length, service_no...code_contact_channel])
(1/1) (52b8519ad9a44832a283c1760f385bf6) switched from RUNNING to FAILED.
java.lang.ArrayIndexOutOfBoundsException: -1
 at java.util.ArrayList.elementData(ArrayList.java:422)
 at java.util.ArrayList.remove(ArrayList.java:499)
 at org.apache.flink.table.runtime.operators.rank.AppendOnlyTopNFunction.processElementWithoutRowNumber(AppendOnlyTopNFunction.java:205)
 at org.apache.flink.table.runtime.operators.rank.AppendOnlyTopNFunction.processElement(AppendOnlyTopNFunction.java:120)
 at org.apache.flink.table.runtime.operators.rank.AppendOnlyTopNFunction.processElement(AppendOnlyTopNFunction.java:46)
 at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
 at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
 at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
 at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
 at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
 at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
 at java.lang.Thread.run(Thread.java:748)
[flink-akka.actor.default-dispatcher-8695] INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy
- Calculating tasks to restart to recover the failed task 28aa070d07f48addbf378d6ee01a29c6_0.


topn函数flinkSQL已经支持很久了,我不清楚为什么会产生这种错误,希望各位老师给出意见

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message