flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Satish Chandra Gupta <scgupt...@gmail.com>
Subject Re: Flink job restart at checkpoint interval
Date Tue, 15 Nov 2016 21:52:45 GMT
Hi Ufuk and Till,

Thanks a lot. Both these suggestions were useful. Older version of xerces
was being loaded from one of the dependencies, and I also fixed the
serialization glitch in my code, and now checkpointing works.

I have 5 value states apart from a custom trigger, and a custom trigger. Is
there anyway I can configure the filename in which these checkpoints are
saved. For example in:

<configured-checkpoint-path>/1193cd5ef0c8de256a059e363dfcb26c/chk-20/f1c9cf97-f5fa-44e2-81df-7729cd8226be

Can I specify somewhere that is included in the file id.

The reason I am asking is, one checkpoint file keeps growing and I suspect
some bug in my logic that is causing memory leak, and I want to identify
which value state is causing this. Any suggestions?

Thanks,
+satish


On Tue, Nov 15, 2016 at 4:01 PM, Till Rohrmann <trohrmann@apache.org> wrote:

> Hi Satish,
>
> your problem seems to be more related to a problem reading Hadoop's
> configuration. According to the internet [1,2,3] try to select a proper
> xerces version to resolve the problem.
>
> [1] http://stackoverflow.com/questions/26974067/org-apache-
> hadoop-conf-configuration-loadresource-error
> [2] http://stackoverflow.com/questions/27860361/issue-http-
> apache-org-xml-features-xinclude-testing-log4j-2
> [3] http://dmitrypukhov.pro/apache-spark-feature-httpapache-
> orgxmlfeaturesxinclude-is-not-recognized/
>
> Cheers,
> Till
>
> On Tue, Nov 15, 2016 at 3:24 AM, Satish Chandra Gupta <scgupta74@gmail.com
> > wrote:
>
>> Most of the ValueState I am using are of Long or Boolean, except one
>> which is a map of Long to Scala case class:
>>
>> ValueState[Map[Long, AnScalaCaseClass]]
>>
>>
>> Does this serialization happen only for the value state members of
>> operators, or also other private fields?
>> Thanks
>> +satish
>>
>> On Mon, Nov 14, 2016 at 3:47 PM, Ufuk Celebi <uce@apache.org> wrote:
>>
>>> There seems to be an Exception happening when Flink tries to serialize
>>> the state of your operator (see the stack trace).
>>>
>>> What are you trying to store via the ValueState? Maybe you can share a
>>> code excerpt?
>>>
>>> – Ufuk
>>>
>>> On 14 November 2016 at 10:51:06, Satish Chandra Gupta (
>>> scgupta74@gmail.com) wrote:
>>> > Hi,
>>> >
>>> > I am using Value State, backed by FsStateBackend on hdfs, as following:
>>> >
>>> > env.setStateBackend(new FsStateBackend(stateBackendPath))
>>> > env.enableCheckpointing(checkpointInterval)
>>> >
>>> >
>>> > It is non-iterative job running Flink/Yarn. The job restarts at
>>> > checkpointInterval, I have tried interval varying from 30 sec to 10
>>> min.
>>> > Any idea why it could be restarting.
>>> >
>>> > I see following exception in the log:
>>> >
>>> > ======
>>> >
>>> > 2016-11-14 09:24:28,787 INFO
>>> > org.apache.flink.runtime.executiongraph.ExecutionGraph -
>>> > Source: Custom Source -> Map -> Filter -> cell_users_update (1/1)
>>> > (fd72961bedbb0f18bffb5ae66b926313) switched from RUNNING to CANCELING
>>> > 2016-11-14 09:24:28,788 INFO org.apache.flink.yarn.YarnJobManager
>>> > - Status of job 03a56958263a688dc34cc8d5069aac8f
>>> > (Processor) changed to FAILING.*java.lang.RuntimeException: Error
>>> > triggering a checkpoint as the result of receiving checkpoint barrier*
>>> > at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEven
>>> t(StreamTask.java:701)
>>> > at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEven
>>> t(StreamTask.java:691)
>>> > at org.apache.flink.streaming.runtime.io.BarrierBuffer.processB
>>> arrier(BarrierBuffer.java:203)
>>> > at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextN
>>> onBlocked(BarrierBuffer.java:129)
>>> > at org.apache.flink.streaming.runtime.io.StreamTwoInputProcesso
>>> r.processInput(StreamTwoInputProcessor.java:215)
>>> > at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.
>>> run(TwoInputStreamTask.java:89)
>>> > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(S
>>> treamTask.java:225)
>>> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>> > at java.lang.Thread.run(Thread.java:745)
>>> > Caused by: com.esotericsoftware.kryo.KryoException:
>>> > java.io.IOException: DataStreamer Exception:
>>> > at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
>>> > at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSeriali
>>> zer.serialize(KryoSerializer.java:200)
>>> > at org.apache.flink.runtime.state.filesystem.AbstractFsState.sn
>>> apshot(AbstractFsState.java:85)
>>> > at org.apache.flink.runtime.state.AbstractStateBackend.snapshot
>>> PartitionedState(AbstractStateBackend.java:265)
>>> > at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>>> tor.snapshotOperatorState(AbstractStreamOperator.java:176)
>>> > at org.apache.flink.streaming.api.operators.AbstractUdfStreamOp
>>> erator.snapshotOperatorState(AbstractUdfStreamOperator.java:121)
>>> > at org.apache.flink.streaming.runtime.tasks.StreamTask.performC
>>> heckpoint(StreamTask.java:498)
>>> > at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEven
>>> t(StreamTask.java:695)
>>> > ... 8 more
>>> > Caused by: java.io.IOException: DataStreamer Exception:
>>> > at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSO
>>> utputStream.java:563)
>>> > Caused by: java.lang.ExceptionInInitializerError
>>> > at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBl
>>> ockOutputStream(DFSOutputStream.java:1322)
>>> > at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBloc
>>> kOutputStream(DFSOutputStream.java:1266)
>>> > at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSO
>>> utputStream.java:449)
>>> > Caused by: java.lang.RuntimeException:
>>> > javax.xml.parsers.ParserConfigurationException: Feature
>>> > 'http://apache.org/xml/features/xinclude' is not recognized.
>>> > at org.apache.hadoop.conf.Configuration.loadResource(Configurat
>>> ion.java:2648)
>>> > at org.apache.hadoop.conf.Configuration.loadResources(Configura
>>> tion.java:2492)
>>> > at org.apache.hadoop.conf.Configuration.getProps(Configuration.
>>> java:2405)
>>> > at org.apache.hadoop.conf.Configuration.get(Configuration.java:981)
>>> > at org.apache.hadoop.conf.Configuration.getTrimmed(Configuratio
>>> n.java:1031)
>>> > at org.apache.hadoop.conf.Configuration.getInt(Configuration.ja
>>> va:1251)
>>> > at org.apache.hadoop.hdfs.protocol.HdfsConstants.(HdfsConstants
>>> .java:76)
>>> > ... 3 more
>>> > Caused by: javax.xml.parsers.ParserConfigurationException: Feature
>>> > 'http://apache.org/xml/features/xinclude' is not recognized.
>>> > at org.apache.xerces.jaxp.DocumentBuilderFactoryImpl.newDocumen
>>> tBuilder(Unknown
>>> > Source)
>>> > at org.apache.hadoop.conf.Configuration.loadResource(Configurat
>>> ion.java:2530)
>>> > ... 9 more
>>> > 2016-11-14 09:24:28,789 INFO
>>> > org.apache.flink.runtime.executiongraph.ExecutionGraph -
>>> > Source: Custom Source -> Map -> Filter -> device_status_update
(1/1)
>>> > (9fe20e7a4336b3960b88febc89135d97) switched from RUNNING to CANCELING
>>> > 2016-11-14 09:24:28,789 INFO
>>> > org.apache.flink.runtime.executiongraph.ExecutionGraph -
>>> > Source: Custom Source -> Map -> Filter -> Map -> Filter ->
>>> > cab_position_update (1/1) (91ea224efa3ba7d130405fbd247f4a45) switched
>>> > from RUNNING to CANCELING
>>> >
>>> > ======
>>> >
>>> > Thanks,
>>> > +satish
>>> >
>>>
>>>
>>
>

Mime
View raw message