flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Satish Chandra Gupta <scgupt...@gmail.com>
Subject Re: Flink job restart at checkpoint interval
Date Thu, 17 Nov 2016 02:08:47 GMT
Hi Till,

Thanks. Yes, that is what I have been doing. But accessing GUI over VPN of
Flink running on a yarn cluster on EMR sometime becomes very slow (not even
execution plan gets shown :-) sometime), that's why I thought of this.

Thanks,
+satish

On Wed, Nov 16, 2016 at 6:46 PM, Till Rohrmann <till.rohrmann@gmail.com>
wrote:

> Hi Satish,
>
> I'm afraid but I think there is no such way to configure the name of the
> checkpoint file for a task at the moment. For the latest checkpoint you can
> see the state sizes for the individual subtask in the web ui under
> checkpoints.
>
> Cheers,
> Till
>
> On Tue, Nov 15, 2016 at 10:52 PM, Satish Chandra Gupta <
> scgupta74@gmail.com> wrote:
>
>> Hi Ufuk and Till,
>>
>> Thanks a lot. Both these suggestions were useful. Older version of xerces
>> was being loaded from one of the dependencies, and I also fixed the
>> serialization glitch in my code, and now checkpointing works.
>>
>> I have 5 value states apart from a custom trigger, and a custom trigger.
>> Is there anyway I can configure the filename in which these checkpoints are
>> saved. For example in:
>>
>> <configured-checkpoint-path>/1193cd5ef0c8de256a059e363dfcb26
>> c/chk-20/f1c9cf97-f5fa-44e2-81df-7729cd8226be
>>
>> Can I specify somewhere that is included in the file id.
>>
>> The reason I am asking is, one checkpoint file keeps growing and I
>> suspect some bug in my logic that is causing memory leak, and I want to
>> identify which value state is causing this. Any suggestions?
>>
>> Thanks,
>> +satish
>>
>>
>> On Tue, Nov 15, 2016 at 4:01 PM, Till Rohrmann <trohrmann@apache.org>
>> wrote:
>>
>>> Hi Satish,
>>>
>>> your problem seems to be more related to a problem reading Hadoop's
>>> configuration. According to the internet [1,2,3] try to select a proper
>>> xerces version to resolve the problem.
>>>
>>> [1] http://stackoverflow.com/questions/26974067/org-apache-h
>>> adoop-conf-configuration-loadresource-error
>>> [2] http://stackoverflow.com/questions/27860361/issue-http-apach
>>> e-org-xml-features-xinclude-testing-log4j-2
>>> [3] http://dmitrypukhov.pro/apache-spark-feature-httpapache-
>>> orgxmlfeaturesxinclude-is-not-recognized/
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Nov 15, 2016 at 3:24 AM, Satish Chandra Gupta <
>>> scgupta74@gmail.com> wrote:
>>>
>>>> Most of the ValueState I am using are of Long or Boolean, except one
>>>> which is a map of Long to Scala case class:
>>>>
>>>> ValueState[Map[Long, AnScalaCaseClass]]
>>>>
>>>>
>>>> Does this serialization happen only for the value state members of
>>>> operators, or also other private fields?
>>>> Thanks
>>>> +satish
>>>>
>>>> On Mon, Nov 14, 2016 at 3:47 PM, Ufuk Celebi <uce@apache.org> wrote:
>>>>
>>>>> There seems to be an Exception happening when Flink tries to serialize
>>>>> the state of your operator (see the stack trace).
>>>>>
>>>>> What are you trying to store via the ValueState? Maybe you can share
a
>>>>> code excerpt?
>>>>>
>>>>> – Ufuk
>>>>>
>>>>> On 14 November 2016 at 10:51:06, Satish Chandra Gupta (
>>>>> scgupta74@gmail.com) wrote:
>>>>> > Hi,
>>>>> >
>>>>> > I am using Value State, backed by FsStateBackend on hdfs, as
>>>>> following:
>>>>> >
>>>>> > env.setStateBackend(new FsStateBackend(stateBackendPath))
>>>>> > env.enableCheckpointing(checkpointInterval)
>>>>> >
>>>>> >
>>>>> > It is non-iterative job running Flink/Yarn. The job restarts at
>>>>> > checkpointInterval, I have tried interval varying from 30 sec to
10
>>>>> min.
>>>>> > Any idea why it could be restarting.
>>>>> >
>>>>> > I see following exception in the log:
>>>>> >
>>>>> > ======
>>>>> >
>>>>> > 2016-11-14 09:24:28,787 INFO
>>>>> > org.apache.flink.runtime.executiongraph.ExecutionGraph -
>>>>> > Source: Custom Source -> Map -> Filter -> cell_users_update
(1/1)
>>>>> > (fd72961bedbb0f18bffb5ae66b926313) switched from RUNNING to
>>>>> CANCELING
>>>>> > 2016-11-14 09:24:28,788 INFO org.apache.flink.yarn.YarnJobManager
>>>>> > - Status of job 03a56958263a688dc34cc8d5069aac8f
>>>>> > (Processor) changed to FAILING.*java.lang.RuntimeException: Error
>>>>> > triggering a checkpoint as the result of receiving checkpoint
>>>>> barrier*
>>>>> > at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEven
>>>>> t(StreamTask.java:701)
>>>>> > at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEven
>>>>> t(StreamTask.java:691)
>>>>> > at org.apache.flink.streaming.runtime.io.BarrierBuffer.processB
>>>>> arrier(BarrierBuffer.java:203)
>>>>> > at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextN
>>>>> onBlocked(BarrierBuffer.java:129)
>>>>> > at org.apache.flink.streaming.runtime.io.StreamTwoInputProcesso
>>>>> r.processInput(StreamTwoInputProcessor.java:215)
>>>>> > at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.
>>>>> run(TwoInputStreamTask.java:89)
>>>>> > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(S
>>>>> treamTask.java:225)
>>>>> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>> > at java.lang.Thread.run(Thread.java:745)
>>>>> > Caused by: com.esotericsoftware.kryo.KryoException:
>>>>> > java.io.IOException: DataStreamer Exception:
>>>>> > at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
>>>>> > at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSeriali
>>>>> zer.serialize(KryoSerializer.java:200)
>>>>> > at org.apache.flink.runtime.state.filesystem.AbstractFsState.sn
>>>>> apshot(AbstractFsState.java:85)
>>>>> > at org.apache.flink.runtime.state.AbstractStateBackend.snapshot
>>>>> PartitionedState(AbstractStateBackend.java:265)
>>>>> > at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>>>>> tor.snapshotOperatorState(AbstractStreamOperator.java:176)
>>>>> > at org.apache.flink.streaming.api.operators.AbstractUdfStreamOp
>>>>> erator.snapshotOperatorState(AbstractUdfStreamOperator.java:121)
>>>>> > at org.apache.flink.streaming.runtime.tasks.StreamTask.performC
>>>>> heckpoint(StreamTask.java:498)
>>>>> > at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEven
>>>>> t(StreamTask.java:695)
>>>>> > ... 8 more
>>>>> > Caused by: java.io.IOException: DataStreamer Exception:
>>>>> > at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSO
>>>>> utputStream.java:563)
>>>>> > Caused by: java.lang.ExceptionInInitializerError
>>>>> > at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBl
>>>>> ockOutputStream(DFSOutputStream.java:1322)
>>>>> > at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBloc
>>>>> kOutputStream(DFSOutputStream.java:1266)
>>>>> > at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSO
>>>>> utputStream.java:449)
>>>>> > Caused by: java.lang.RuntimeException:
>>>>> > javax.xml.parsers.ParserConfigurationException: Feature
>>>>> > 'http://apache.org/xml/features/xinclude' is not recognized.
>>>>> > at org.apache.hadoop.conf.Configuration.loadResource(Configurat
>>>>> ion.java:2648)
>>>>> > at org.apache.hadoop.conf.Configuration.loadResources(Configura
>>>>> tion.java:2492)
>>>>> > at org.apache.hadoop.conf.Configuration.getProps(Configuration.
>>>>> java:2405)
>>>>> > at org.apache.hadoop.conf.Configuration.get(Configuration.java:981)
>>>>> > at org.apache.hadoop.conf.Configuration.getTrimmed(Configuratio
>>>>> n.java:1031)
>>>>> > at org.apache.hadoop.conf.Configuration.getInt(Configuration.ja
>>>>> va:1251)
>>>>> > at org.apache.hadoop.hdfs.protocol.HdfsConstants.(HdfsConstants
>>>>> .java:76)
>>>>> > ... 3 more
>>>>> > Caused by: javax.xml.parsers.ParserConfigurationException: Feature
>>>>> > 'http://apache.org/xml/features/xinclude' is not recognized.
>>>>> > at org.apache.xerces.jaxp.DocumentBuilderFactoryImpl.newDocumen
>>>>> tBuilder(Unknown
>>>>> > Source)
>>>>> > at org.apache.hadoop.conf.Configuration.loadResource(Configurat
>>>>> ion.java:2530)
>>>>> > ... 9 more
>>>>> > 2016-11-14 09:24:28,789 INFO
>>>>> > org.apache.flink.runtime.executiongraph.ExecutionGraph -
>>>>> > Source: Custom Source -> Map -> Filter -> device_status_update
(1/1)
>>>>> > (9fe20e7a4336b3960b88febc89135d97) switched from RUNNING to
>>>>> CANCELING
>>>>> > 2016-11-14 09:24:28,789 INFO
>>>>> > org.apache.flink.runtime.executiongraph.ExecutionGraph -
>>>>> > Source: Custom Source -> Map -> Filter -> Map -> Filter
->
>>>>> > cab_position_update (1/1) (91ea224efa3ba7d130405fbd247f4a45)
>>>>> switched
>>>>> > from RUNNING to CANCELING
>>>>> >
>>>>> > ======
>>>>> >
>>>>> > Thanks,
>>>>> > +satish
>>>>> >
>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message