From issues-return-422503-archive-asf-public=cust-asf.ponee.io@flink.apache.org Mon Nov 23 06:24:02 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mxout1-he-de.apache.org (mxout1-he-de.apache.org [95.216.194.37]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id 65AA6180654 for ; Mon, 23 Nov 2020 07:24:02 +0100 (CET) Received: from mail.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mxout1-he-de.apache.org (ASF Mail Server at mxout1-he-de.apache.org) with SMTP id D6CD0658D5 for ; Mon, 23 Nov 2020 06:24:01 +0000 (UTC) Received: (qmail 47317 invoked by uid 500); 23 Nov 2020 06:24:01 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 47308 invoked by uid 99); 23 Nov 2020 06:24:01 -0000 Received: from mailrelay1-he-de.apache.org (HELO mailrelay1-he-de.apache.org) (116.203.21.61) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 23 Nov 2020 06:24:01 +0000 Received: from jira2-he-de.apache.org (jira2-he-de.apache.org [168.119.33.54]) by mailrelay1-he-de.apache.org (ASF Mail Server at mailrelay1-he-de.apache.org) with ESMTPS id 250603E824 for ; Mon, 23 Nov 2020 06:24:00 +0000 (UTC) Received: from jira2-he-de.apache.org (localhost.localdomain [127.0.0.1]) by jira2-he-de.apache.org (ASF Mail Server at jira2-he-de.apache.org) with ESMTP id 0A9AAC803AE for ; Mon, 23 Nov 2020 06:24:00 +0000 (UTC) Date: Mon, 23 Nov 2020 06:24:00 +0000 (UTC) From: =?utf-8?Q?=E8=B0=A2=E6=B3=A2_=28Jira=29?= To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-20277) flink-1.11.2 ContinuousFileMonitoringFunction cannot restore from failure MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/FLINK-20277?page=3Dcom.atlassia= n.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=3D172= 37143#comment-17237143 ]=20 =E8=B0=A2=E6=B3=A2 commented on FLINK-20277: ---------------------------- Sorry, I'll pay attention next time. I have a job that needs to read the Hive table once a minute. When the task= fails, the job keeps restarting, keeps throwing exceptions, and is unrecov= erable. It seems that this exception is thrown when there is no file under = the hive table, and after the exception is thrown, the state cannot be reco= vered. =C2=A0 > flink-1.11.2 ContinuousFileMonitoringFunction cannot restore from failure > ------------------------------------------------------------------------- > > Key: FLINK-20277 > URL: https://issues.apache.org/jira/browse/FLINK-20277 > Project: Flink > Issue Type: Bug > Components: Table SQL / Ecosystem > Affects Versions: 1.11.2 > Reporter: =E8=B0=A2=E6=B3=A2 > Assignee: godfrey he > Priority: Blocker > Fix For: 1.11.3 > > > =E6=B5=81=E5=BC=8F=E6=B6=88=E8=B4=B9Hive=E8=A1=A8=EF=BC=8C=E5=87=BA=E7=8E= =B0=E5=A4=B1=E8=B4=A5=E6=97=B6=EF=BC=8C=E4=BB=BB=E5=8A=A1=E6=97=A0=E6=B3=95= =E6=AD=A3=E5=B8=B8=E6=81=A2=E5=A4=8D=EF=BC=8C=E4=B8=80=E7=9B=B4=E9=87=8D=E5= =90=AF=E3=80=82 > =E4=B8=80=E7=9B=B4=E6=8A=A5=E9=94=99=EF=BC=9AThe ContinuousFileMonitoring= Function has already restored from a previous Flink version. > =C2=A0 > {color:#FF0000}java.io.FileNotFoundException: File does not exist: hdfs:/= /nameservice1/rawdata/db/bw_hana/sapecc/hepecc_ekko_cut{color} > at org.apache.hadoop.hdfs.DistributedFileSystem$20.doCall(DistributedFil= eSystem.java:1270) ~[hadoop-hdfs-2.6.0-cdh5.16.2.jar:?] > at org.apache.hadoop.hdfs.DistributedFileSystem$20.doCall(DistributedFil= eSystem.java:1262) ~[hadoop-hdfs-2.6.0-cdh5.16.2.jar:?] > at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkRes= olver.java:81) ~[hadoop-common-2.6.0-cdh5.16.2.jar:?] > at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(Distribute= dFileSystem.java:1262) ~[hadoop-hdfs-2.6.0-cdh5.16.2.jar:?] > at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getFileStatus(Hadoo= pFileSystem.java:85) ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(File= InputFormat.java:588) ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitor= ingFunction.getInputSplitsSortedByModTime(ContinuousFileMonit > oringFunction.java:279) ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitor= ingFunction.monitorDirAndForwardSplits(ContinuousFileMonitori > ngFunction.java:251) ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitor= ingFunction.run(ContinuousFileMonitoringFunction.java:215) ~[ > flink-dist_2.11-1.11.2.jar:1.11.2] > at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSourc= e.java:100) ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSourc= e.java:63) ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourc= eFunctionThread.run(SourceStreamTask.java:213) ~[flink-dist_2 > .11-1.11.2.jar:1.11.2] > =C2=A0 > =C2=A0 > 2020-11-23 05:00:33,313 INFO org.apache.flink.runtime.executiongraph.Exec= utionGraph [] - Split Reader: HiveFileMonitoringFunction -> S > ink: Sink(table=3D[default_catalog.default_database.kafka_hepecc_ekko_cu= t_json], fields=3D[mandt, ebeln, bukrs, bstyp, bsart, bsakz, loekz, statu > , aedat, ernam, pincr, lponr, lifnr, spras, zterm, zbd1t, zbd2t, zbd3t, = zbd1p, zbd2p, ekorg, ekgrp, waers, wkurs, kufix, bedat, kdatb, kdate, > bwbdt, angdt, bnddt, gwldt, ausnr, angnr, ihran, ihrez, verkf, telf1, ll= ief, kunnr, konnr, abgru, autlf, weakt, reswk, lblif, inco1, inco2,=20 > ktwrt, submi, knumv, kalsm, stafo, lifre, exnum, unsez, logsy, upinc, st= ako, frggr, frgsx, frgke, frgzu, frgrl, lands, lphis, adrnr, stceg_l, > stceg, absgr, addnr, kornr, memory, procstat, rlwrt, revno, scmproc, rea= son_code, memorytype, rettp, retpc, dptyp, dppct, dpamt, dpdat, msr_ > id, hierarchy_exists, threshold_exists, legal_contract, description, rel= ease_date, force_id, force_cnt, reloc_id, reloc_seq_id, source_logsys > , auflg, yxcort, ysyb, ypsfs, yxqlx, yjplx, ylszj, yxdry, yxdrymc, ylbid= 1, yqybm, ysxpt_order, yy_write_if, fanpfg, yresfg, yrcofg, yretxt, y > ...skipping... > {color:#FF0000}java.lang.IllegalArgumentException: The ContinuousFileMon= itoringFunction has already restored from a previous Flink version.{color} > at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitor= ingFunction.initializeState(ContinuousFileMonitoringFunction.java:176) ~[fl= ink-dist_2.11-1.11.2.jar:1.11.2] > at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryR= estoreFunction(StreamingFunctionUtils.java:185) ~[flink-dist_2.11-1.11.2.ja= r:1.11.2] > at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.rest= oreFunctionState(StreamingFunctionUtils.java:167) ~[flink-dist_2.11-1.11.2.= jar:1.11.2] > at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.in= itializeState(AbstractUdfStreamOperator.java:96) ~[flink-dist_2.11-1.11.2.j= ar:1.11.2] > at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.i= nitializeOperatorState(StreamOperatorStateHandler.java:106) ~[flink-dist_2.= 11-1.11.2.jar:1.11.2] > at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initi= alizeState(AbstractStreamOperator.java:258) ~[flink-dist_2.11-1.11.2.jar:1.= 11.2] > at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStat= eAndOpenOperators(OperatorChain.java:290) ~[flink-dist_2.11-1.11.2.jar:1.11= .2] > at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvo= ke$0(StreamTask.java:479) ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$Syn= chronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java= :92) ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(Stre= amTask.java:475) ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask= .java:528) ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) ~[flin= k-dist_2.11-1.11.2.jar:1.11.2] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) ~[flink-= dist_2.11-1.11.2.jar:1.11.2] > at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_251] > =C2=A0 -- This message was sent by Atlassian Jira (v8.3.4#803005)