flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ted Yu <yuzhih...@gmail.com>
Subject Re: Flink 1.3 - Checkpointing failing
Date Sat, 03 Jun 2017 03:40:59 GMT
If I read CompositeTypeSerializerConfigSnapshot ctor correctly:

    for (TypeSerializer<?> nestedSerializer : nestedSerializers) {
      TypeSerializerConfigSnapshot configSnapshot =
nestedSerializer.snapshotConfiguration();
      this.nestedSerializersAndConfigs.add(

The UnsupportedOperationException thrown by snapshotConfiguration() should
be caught without proceeding to nestedSerializersAndConfigs.add().

On Fri, Jun 2, 2017 at 7:02 PM, Ted Yu <yuzhihong@gmail.com> wrote:

> Your case doesn't seem like FLINK-5462 since there was no CancellationException
> in the stack trace you posted.
>
> The exception from TraversableSerializer.snapshotConfiguration() was
> added by FLINK-6178
>
> FYI
>
> On Fri, Jun 2, 2017 at 4:04 PM, MAHESH KUMAR <r.mahesh.kumar.blr@gmail.com
> > wrote:
>
>> Hi Team,
>>
>> We have some test cases written using StreamingMultipleProgramsTestBase
>> It was working fine in version 1.2, we get the following error in version
>> 1.3
>> It seems like CheckpointCoordinator fails after this error and
>> Checkpointing no longer occurs.
>>
>> I came across this bug: https://issues.apache.org/jira/browse/FLINK-5462
>> , It looks kind of similar but I am not exactly sure.
>>
>> 2017-06-02 16:11:07,048  INFO | flink-akka.actor.default-dispatcher-3 |
>> org.apache.flink.runtime.executiongraph.ExecutionGraph  | Could not
>> restart the job pipeline_message_auditor (f54182ae17352efb9aa40667c283ce09)
>> because the restart strategy prevented it.
>> org.apache.flink.streaming.runtime.tasks.AsynchronousException:
>> java.lang.Exception: Could not materialize checkpoint 1 for operator
>> TriggerWindow(TumblingProcessingTimeWindows(4000),
>> ReducingStateDescriptor{serializer=com.oracle.ci.flink.
>> streaming.MessageAuditorStreamingHelper$$anonfun$1$$anon$7$$
>> anon$2@e42b922d, reduceFunction=org.apache.flin
>> k.streaming.api.scala.function.util.ScalaReduceFunction@59623f44},
>> ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java:301))
>> -> (Map -> Sink: auditor_out-kafkaSink, Map -> Sink:
>> auditor_expire-kafkaSink) (1/1).
>> at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.run(StreamTask.java:963) ~[flink-streaming-java_2.11-1.
>> 3.0.jar:1.3.0]
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> ~[na:1.8.0_112]
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> ~[na:1.8.0_112]
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> ~[na:1.8.0_112]
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> ~[na:1.8.0_112]
>> at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_112]
>> Caused by: java.lang.Exception: Could not materialize checkpoint 1 for
>> operator TriggerWindow(TumblingProcessingTimeWindows(4000),
>> ReducingStateDescriptor{serializer=com.oracle.ci.flink.
>> streaming.MessageAuditorStreamingHelper$$anonfun$1$$anon$7$$
>> anon$2@e42b922d, reduceFunction=org.apache.flin
>> k.streaming.api.scala.function.util.ScalaReduceFunction@59623f44},
>> ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java:301))
>> -> (Map -> Sink: auditor_out-kafkaSink, Map -> Sink:
>> auditor_expire-kafkaSink) (1/1).
>> ... 6 common frames omitted
>> Caused by: java.util.concurrent.ExecutionException:
>> java.lang.UnsupportedOperationException
>> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>> ~[na:1.8.0_112]
>> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>> ~[na:1.8.0_112]
>> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>> ~[flink-core-1.3.0.jar:1.3.0]
>> at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.run(StreamTask.java:893) ~[flink-streaming-java_2.11-1.
>> 3.0.jar:1.3.0]
>> ... 5 common frames omitted
>> Suppressed: java.lang.Exception: Could not properly cancel managed keyed
>> state future.
>> at org.apache.flink.streaming.api.operators.OperatorSnapshotRes
>> ult.cancel(OperatorSnapshotResult.java:90) ~[flink-streaming-java_2.11-1.
>> 3.0.jar:1.3.0]
>> at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.cleanup(StreamTask.java:1018)
>> ~[flink-streaming-java_2.11-1.3.0.jar:1.3.0]
>> at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.run(StreamTask.java:957) ~[flink-streaming-java_2.11-1.
>> 3.0.jar:1.3.0]
>> ... 5 common frames omitted
>> Caused by: java.util.concurrent.ExecutionException:
>> java.lang.UnsupportedOperationException
>> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUt
>> il.java:43)
>> at org.apache.flink.runtime.state.StateUtil.discardStateFuture(
>> StateUtil.java:85)
>> at org.apache.flink.streaming.api.operators.OperatorSnapshotRes
>> ult.cancel(OperatorSnapshotResult.java:88)
>> ... 7 common frames omitted
>> Caused by: java.lang.UnsupportedOperationException: null
>> at org.apache.flink.api.scala.typeutils.TraversableSerializer.s
>> napshotConfiguration(TraversableSerializer.scala:155)
>> at org.apache.flink.api.common.typeutils.CompositeTypeSerialize
>> rConfigSnapshot.<init>(CompositeTypeSerializerConfigSnapshot.java:53)
>> at org.apache.flink.api.java.typeutils.runtime.TupleSerializerC
>> onfigSnapshot.<init>(TupleSerializerConfigSnapshot.java:45)
>> at org.apache.flink.api.java.typeutils.runtime.TupleSerializerB
>> ase.snapshotConfiguration(TupleSerializerBase.java:132)
>> at org.apache.flink.api.java.typeutils.runtime.TupleSerializerB
>> ase.snapshotConfiguration(TupleSerializerBase.java:39)
>> at org.apache.flink.runtime.state.RegisteredKeyedBackendStateMe
>> taInfo.snapshot(RegisteredKeyedBackendStateMetaInfo.java:71)
>> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBa
>> ckend$RocksDBFullSnapshotOperation.writeKVStateMetaData(Rock
>> sDBKeyedStateBackend.java:591)
>> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBa
>> ckend$RocksDBFullSnapshotOperation.writeDBSnapshot(RocksDBKe
>> yedStateBackend.java:510)
>> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBa
>> ckend$3.performOperation(RocksDBKeyedStateBackend.java:407)
>> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBa
>> ckend$3.performOperation(RocksDBKeyedStateBackend.java:389)
>> at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.ca
>> ll(AbstractAsyncIOCallable.java:72)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUt
>> il.java:40)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.run(StreamTask.java:893)
>> ... 5 common frames omitted
>> Caused by: java.lang.UnsupportedOperationException: null
>> at org.apache.flink.api.scala.typeutils.TraversableSerializer.s
>> napshotConfiguration(TraversableSerializer.scala:155)
>> ~[flink-scala_2.11-1.3.0.jar:1.3.0]
>> at org.apache.flink.api.common.typeutils.CompositeTypeSerialize
>> rConfigSnapshot.<init>(CompositeTypeSerializerConfigSnapshot.java:53)
>> ~[flink-core-1.3.0.jar:1.3.0]
>> at org.apache.flink.api.java.typeutils.runtime.TupleSerializerC
>> onfigSnapshot.<init>(TupleSerializerConfigSnapshot.java:45)
>> ~[flink-core-1.3.0.jar:1.3.0]
>> at org.apache.flink.api.java.typeutils.runtime.TupleSerializerB
>> ase.snapshotConfiguration(TupleSerializerBase.java:132)
>> ~[flink-core-1.3.0.jar:1.3.0]
>> at org.apache.flink.api.java.typeutils.runtime.TupleSerializerB
>> ase.snapshotConfiguration(TupleSerializerBase.java:39)
>> ~[flink-core-1.3.0.jar:1.3.0]
>> at org.apache.flink.runtime.state.RegisteredKeyedBackendStateMe
>> taInfo.snapshot(RegisteredKeyedBackendStateMetaInfo.java:71)
>> ~[flink-runtime_2.11-1.3.0.jar:1.3.0]
>> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBa
>> ckend$RocksDBFullSnapshotOperation.writeKVStateMetaData(RocksDBKeyedStateBackend.java:591)
>> ~[flink-statebackend-rocksdb_2.11-1.3.0.jar:1.3.0]
>> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBa
>> ckend$RocksDBFullSnapshotOperation.writeDBSnapshot(RocksDBKeyedStateBackend.java:510)
>> ~[flink-statebackend-rocksdb_2.11-1.3.0.jar:1.3.0]
>> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBa
>> ckend$3.performOperation(RocksDBKeyedStateBackend.java:407)
>> ~[flink-statebackend-rocksdb_2.11-1.3.0.jar:1.3.0]
>> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBa
>> ckend$3.performOperation(RocksDBKeyedStateBackend.java:389)
>> ~[flink-statebackend-rocksdb_2.11-1.3.0.jar:1.3.0]
>> at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.ca
>> ll(AbstractAsyncIOCallable.java:72) ~[flink-runtime_2.11-1.3.0.jar:1.3.0]
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> ~[na:1.8.0_112]
>> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>> ~[flink-core-1.3.0.jar:1.3.0]
>> ... 6 common frames omitted
>> 2017-06-02 16:11:07,048  INFO | flink-akka.actor.default-dispatcher-3 |
>> o.apache.flink.runtime.checkpoint.CheckpointCoordinator | Stopping
>> checkpoint coordinator for job f54182ae17352efb9aa40667c283ce09
>> 2017-06-02 16:11:07,048  INFO | flink-akka.actor.default-dispatcher-3 |
>> o.a.f.r.checkpoint.StandaloneCompletedCheckpointStore   | Shutting down
>>
>>
>> Thanks and Regards,
>> Mahesh
>>
>>
>>
>

Mime
View raw message