flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 谢波 (Jira) <j...@apache.org>
Subject [jira] [Commented] (FLINK-20277) flink-1.11.2 ContinuousFileMonitoringFunction cannot restore from failure
Date Mon, 23 Nov 2020 06:24:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-20277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17237143#comment-17237143
] 

谢波 commented on FLINK-20277:
----------------------------

Sorry, I'll pay attention next time.

I have a job that needs to read the Hive table once a minute. When the task fails, the job
keeps restarting, keeps throwing exceptions, and is unrecoverable. It seems that this exception
is thrown when there is no file under the hive table, and after the exception is thrown, the
state cannot be recovered.

 

> flink-1.11.2 ContinuousFileMonitoringFunction cannot restore from failure
> -------------------------------------------------------------------------
>
>                 Key: FLINK-20277
>                 URL: https://issues.apache.org/jira/browse/FLINK-20277
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Ecosystem
>    Affects Versions: 1.11.2
>            Reporter: 谢波
>            Assignee: godfrey he
>            Priority: Blocker
>             Fix For: 1.11.3
>
>
> 流式消费Hive表,出现失败时,任务无法正常恢复,一直重启。
> 一直报错:The ContinuousFileMonitoringFunction has already restored from a previous
Flink version.
>  
> {color:#FF0000}java.io.FileNotFoundException: File does not exist: hdfs://nameservice1/rawdata/db/bw_hana/sapecc/hepecc_ekko_cut{color}
>  at org.apache.hadoop.hdfs.DistributedFileSystem$20.doCall(DistributedFileSystem.java:1270)
~[hadoop-hdfs-2.6.0-cdh5.16.2.jar:?]
>  at org.apache.hadoop.hdfs.DistributedFileSystem$20.doCall(DistributedFileSystem.java:1262)
~[hadoop-hdfs-2.6.0-cdh5.16.2.jar:?]
>  at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
~[hadoop-common-2.6.0-cdh5.16.2.jar:?]
>  at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1262)
~[hadoop-hdfs-2.6.0-cdh5.16.2.jar:?]
>  at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:85)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
>  at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:588)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
>  at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.getInputSplitsSortedByModTime(ContinuousFileMonit
>  oringFunction.java:279) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>  at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.monitorDirAndForwardSplits(ContinuousFileMonitori
>  ngFunction.java:251) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>  at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:215)
~[
>  flink-dist_2.11-1.11.2.jar:1.11.2]
>  at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
>  at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>  at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
~[flink-dist_2
>  .11-1.11.2.jar:1.11.2]
>  
>  
> 2020-11-23 05:00:33,313 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph []
- Split Reader: HiveFileMonitoringFunction -> S
>  ink: Sink(table=[default_catalog.default_database.kafka_hepecc_ekko_cut_json], fields=[mandt,
ebeln, bukrs, bstyp, bsart, bsakz, loekz, statu
>  , aedat, ernam, pincr, lponr, lifnr, spras, zterm, zbd1t, zbd2t, zbd3t, zbd1p, zbd2p,
ekorg, ekgrp, waers, wkurs, kufix, bedat, kdatb, kdate,
>  bwbdt, angdt, bnddt, gwldt, ausnr, angnr, ihran, ihrez, verkf, telf1, llief, kunnr,
konnr, abgru, autlf, weakt, reswk, lblif, inco1, inco2, 
>  ktwrt, submi, knumv, kalsm, stafo, lifre, exnum, unsez, logsy, upinc, stako, frggr,
frgsx, frgke, frgzu, frgrl, lands, lphis, adrnr, stceg_l,
>  stceg, absgr, addnr, kornr, memory, procstat, rlwrt, revno, scmproc, reason_code, memorytype,
rettp, retpc, dptyp, dppct, dpamt, dpdat, msr_
>  id, hierarchy_exists, threshold_exists, legal_contract, description, release_date, force_id,
force_cnt, reloc_id, reloc_seq_id, source_logsys
>  , auflg, yxcort, ysyb, ypsfs, yxqlx, yjplx, ylszj, yxdry, yxdrymc, ylbid1, yqybm, ysxpt_order,
yy_write_if, fanpfg, yresfg, yrcofg, yretxt, y
>  ...skipping...
>  {color:#FF0000}java.lang.IllegalArgumentException: The ContinuousFileMonitoringFunction
has already restored from a previous Flink version.{color}
>  at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.initializeState(ContinuousFileMonitoringFunction.java:176)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
>  at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
>  at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
>  at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
>  at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
>  at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
>  at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
>  at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
>  at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
>  at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
>  at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>  at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_251]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message