Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 374DA200C89 for ; Sat, 3 Jun 2017 21:37:10 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 35B2C160BCD; Sat, 3 Jun 2017 19:37:10 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D525C160BB5 for ; Sat, 3 Jun 2017 21:37:08 +0200 (CEST) Received: (qmail 67588 invoked by uid 500); 3 Jun 2017 19:37:07 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 67577 invoked by uid 99); 3 Jun 2017 19:37:07 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 03 Jun 2017 19:37:07 +0000 Received: from Tzu-Lis-MBP.fritz.box.mail (ip-109-45-2-77.web.vodafone.de [109.45.2.77]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 61ADD1A00A8; Sat, 3 Jun 2017 19:37:05 +0000 (UTC) Date: Sat, 3 Jun 2017 21:37:01 +0200 From: "Tzu-Li (Gordon) Tai" To: Ted Yu , MAHESH KUMAR Cc: user Message-ID: In-Reply-To: References: Subject: Re: Flink 1.3 - Checkpointing failing X-Mailer: Airmail (420) MIME-Version: 1.0 Content-Type: multipart/alternative; boundary="59330fdd_160b3203_1625d" archived-at: Sat, 03 Jun 2017 19:37:10 -0000 --59330fdd_160b3203_1625d Content-Type: text/plain; charset="utf-8" Content-Transfer-Encoding: quoted-printable Content-Disposition: inline Hi Mahesh, Thanks a lot for reporting this. This would be a bug:=C2=A0https://issues= .apache.org/jira/browse/=46LINK-6844. Apparently the TraversableSerializer could take part in checkpointing and= therefore should implement the new compatibility methods. I=E2=80=99ll make sure that the fix for this gets into 1.3.1. Cheers, Gordon On 3 June 2017 at 5:41:12 AM, Ted Yu (yuzhihong=40gmail.com) wrote: If I read=C2=A0CompositeTypeSerializerConfigSnapshot ctor correctly: =C2=A0 =C2=A0 for (TypeSerializer<=3F> nestedSerializer : nestedSerialize= rs) =7B =C2=A0 =C2=A0 =C2=A0 TypeSerializerConfigSnapshot configSnapshot =3D nest= edSerializer.snapshotConfiguration(); =C2=A0 =C2=A0 =C2=A0 this.nestedSerializersAndConfigs.add( The=C2=A0UnsupportedOperationException thrown by=C2=A0snapshotConfigurati= on() should be caught without proceeding to=C2=A0nestedSerializersAndConf= igs.add().=C2=A0 On =46ri, Jun 2, 2017 at 7:02 PM, Ted Yu wrote: Your case doesn't seem like=C2=A0=46LINK-5462 since there was no=C2=A0Can= cellationException in the stack trace you posted. The exception from TraversableSerializer.snapshotConfiguration() was adde= d by=C2=A0=46LINK-6178 =46YI On =46ri, Jun 2, 2017 at 4:04 PM, MAHESH KUMAR wrote: Hi Team, We have some test cases written using StreamingMultipleProgramsTestBase It was working fine in version 1.2, we get the following error in version= 1.3 It seems like=C2=A0CheckpointCoordinator fails after this error and Check= pointing no longer occurs. I came across this bug:=C2=A0https://issues.apache.org/jira/browse/=46LIN= K-5462 , It looks kind of similar but I am not exactly sure. 2017-06-02 16:11:07,048 =C2=A0IN=46O =7C flink-akka.actor.default-dispatc= her-3 =7C org.apache.flink.runtime.executiongraph.ExecutionGraph =C2=A0=7C= Could not restart the job pipeline=5Fmessage=5Fauditor (f54182ae17352efb= 9aa40667c283ce09) because the restart strategy prevented it. org.apache.flink.streaming.runtime.tasks.AsynchronousException: java.lang= .Exception: Could not materialize checkpoint 1 for operator TriggerWindow= (TumblingProcessingTimeWindows(4000), ReducingStateDescriptor=7Bserialize= r=3Dcom.oracle.ci.flink.streaming.MessageAuditorStreamingHelper=24=24anon= fun=241=24=24anon=247=24=24anon=242=40e42b922d, reduce=46unction=3Dorg.ap= ache.flink.streaming.api.scala.function.util.ScalaReduce=46unction=405962= 3f44=7D, ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.ja= va:301)) -> (Map -> Sink: auditor=5Fout-kafkaSink, Map -> Sink: auditor=5F= expire-kafkaSink) (1/1). at org.apache.flink.streaming.runtime.tasks.StreamTask=24AsyncCheckpointR= unnable.run(StreamTask.java:963) =7E=5Bflink-streaming-java=5F2.11-1.3.0.= jar:1.3.0=5D at java.util.concurrent.Executors=24RunnableAdapter.call(Executors.java:5= 11) =7E=5Bna:1.8.0=5F112=5D at java.util.concurrent.=46utureTask.run(=46utureTask.java:266) =7E=5Bna:= 1.8.0=5F112=5D at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.j= ava:1142) =7E=5Bna:1.8.0=5F112=5D at java.util.concurrent.ThreadPoolExecutor=24Worker.run(ThreadPoolExecuto= r.java:617) =7E=5Bna:1.8.0=5F112=5D at java.lang.Thread.run(Thread.java:745) =7E=5Bna:1.8.0=5F112=5D Caused by: java.lang.Exception: Could not materialize checkpoint 1 for op= erator TriggerWindow(TumblingProcessingTimeWindows(4000), ReducingStateDe= scriptor=7Bserializer=3Dcom.oracle.ci.flink.streaming.MessageAuditorStrea= mingHelper=24=24anonfun=241=24=24anon=247=24=24anon=242=40e42b922d, reduc= e=46unction=3Dorg.apache.flink.streaming.api.scala.function.util.ScalaRed= uce=46unction=4059623f44=7D, ProcessingTimeTrigger(), WindowedStream.redu= ce(WindowedStream.java:301)) -> (Map -> Sink: auditor=5Fout-kafkaSink, Ma= p -> Sink: auditor=5Fexpire-kafkaSink) (1/1). ... 6 common frames omitted Caused by: java.util.concurrent.ExecutionException: java.lang.Unsupported= OperationException at java.util.concurrent.=46utureTask.report(=46utureTask.java:122) =7E=5B= na:1.8.0=5F112=5D at java.util.concurrent.=46utureTask.get(=46utureTask.java:192) =7E=5Bna:= 1.8.0=5F112=5D at org.apache.flink.util.=46utureUtil.runIfNotDoneAndGet(=46utureUtil.jav= a:43) =7E=5Bflink-core-1.3.0.jar:1.3.0=5D at org.apache.flink.streaming.runtime.tasks.StreamTask=24AsyncCheckpointR= unnable.run(StreamTask.java:893) =7E=5Bflink-streaming-java=5F2.11-1.3.0.= jar:1.3.0=5D ... 5 common frames omitted Suppressed: java.lang.Exception: Could not properly cancel managed keyed = state future. at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel= (OperatorSnapshotResult.java:90) =7E=5Bflink-streaming-java=5F2.11-1.3.0.= jar:1.3.0=5D at org.apache.flink.streaming.runtime.tasks.StreamTask=24AsyncCheckpointR= unnable.cleanup(StreamTask.java:1018) =7E=5Bflink-streaming-java=5F2.11-1= .3.0.jar:1.3.0=5D at org.apache.flink.streaming.runtime.tasks.StreamTask=24AsyncCheckpointR= unnable.run(StreamTask.java:957) =7E=5Bflink-streaming-java=5F2.11-1.3.0.= jar:1.3.0=5D ... 5 common frames omitted Caused by: java.util.concurrent.ExecutionException: java.lang.Unsupported= OperationException at java.util.concurrent.=46utureTask.report(=46utureTask.java:122) at java.util.concurrent.=46utureTask.get(=46utureTask.java:192) at org.apache.flink.util.=46utureUtil.runIfNotDoneAndGet(=46utureUtil.jav= a:43) at org.apache.flink.runtime.state.StateUtil.discardState=46uture(StateUti= l.java:85) at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel= (OperatorSnapshotResult.java:88) ... 7 common frames omitted Caused by: java.lang.UnsupportedOperationException: null at org.apache.flink.api.scala.typeutils.TraversableSerializer.snapshotCon= figuration(TraversableSerializer.scala:155) at org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSna= pshot.(CompositeTypeSerializerConfigSnapshot.java:53) at org.apache.flink.api.java.typeutils.runtime.TupleSerializerConfigSnaps= hot.(TupleSerializerConfigSnapshot.java:45) at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.snapsh= otConfiguration(TupleSerializerBase.java:132) at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.snapsh= otConfiguration(TupleSerializerBase.java:39) at org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.sna= pshot(RegisteredKeyedBackendStateMetaInfo.java:71) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend=24Ro= cksDB=46ullSnapshotOperation.writeKVStateMetaData(RocksDBKeyedStateBacken= d.java:591) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend=24Ro= cksDB=46ullSnapshotOperation.writeDBSnapshot(RocksDBKeyedStateBackend.jav= a:510) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend=243.= performOperation(RocksDBKeyedStateBackend.java:407) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend=243.= performOperation(RocksDBKeyedStateBackend.java:389) at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(Abstrac= tAsyncIOCallable.java:72) at java.util.concurrent.=46utureTask.run(=46utureTask.java:266) at org.apache.flink.util.=46utureUtil.runIfNotDoneAndGet(=46utureUtil.jav= a:40) at org.apache.flink.streaming.runtime.tasks.StreamTask=24AsyncCheckpointR= unnable.run(StreamTask.java:893) ... 5 common frames omitted Caused by: java.lang.UnsupportedOperationException: null at org.apache.flink.api.scala.typeutils.TraversableSerializer.snapshotCon= figuration(TraversableSerializer.scala:155) =7E=5Bflink-scala=5F2.11-1.3.= 0.jar:1.3.0=5D at org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSna= pshot.(CompositeTypeSerializerConfigSnapshot.java:53) =7E=5Bflink-c= ore-1.3.0.jar:1.3.0=5D at org.apache.flink.api.java.typeutils.runtime.TupleSerializerConfigSnaps= hot.(TupleSerializerConfigSnapshot.java:45) =7E=5Bflink-core-1.3.0.= jar:1.3.0=5D at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.snapsh= otConfiguration(TupleSerializerBase.java:132) =7E=5Bflink-core-1.3.0.jar:= 1.3.0=5D at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.snapsh= otConfiguration(TupleSerializerBase.java:39) =7E=5Bflink-core-1.3.0.jar:1= .3.0=5D at org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.sna= pshot(RegisteredKeyedBackendStateMetaInfo.java:71) =7E=5Bflink-runtime=5F= 2.11-1.3.0.jar:1.3.0=5D at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend=24Ro= cksDB=46ullSnapshotOperation.writeKVStateMetaData(RocksDBKeyedStateBacken= d.java:591) =7E=5Bflink-statebackend-rocksdb=5F2.11-1.3.0.jar:1.3.0=5D at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend=24Ro= cksDB=46ullSnapshotOperation.writeDBSnapshot(RocksDBKeyedStateBackend.jav= a:510) =7E=5Bflink-statebackend-rocksdb=5F2.11-1.3.0.jar:1.3.0=5D at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend=243.= performOperation(RocksDBKeyedStateBackend.java:407) =7E=5Bflink-stateback= end-rocksdb=5F2.11-1.3.0.jar:1.3.0=5D at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend=243.= performOperation(RocksDBKeyedStateBackend.java:389) =7E=5Bflink-stateback= end-rocksdb=5F2.11-1.3.0.jar:1.3.0=5D at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(Abstrac= tAsyncIOCallable.java:72) =7E=5Bflink-runtime=5F2.11-1.3.0.jar:1.3.0=5D at java.util.concurrent.=46utureTask.run(=46utureTask.java:266) =7E=5Bna:= 1.8.0=5F112=5D at org.apache.flink.util.=46utureUtil.runIfNotDoneAndGet(=46utureUtil.jav= a:40) =7E=5Bflink-core-1.3.0.jar:1.3.0=5D ... 6 common frames omitted 2017-06-02 16:11:07,048 =C2=A0IN=46O =7C flink-akka.actor.default-dispatc= her-3 =7C o.apache.flink.runtime.checkpoint.CheckpointCoordinator =7C Sto= pping checkpoint coordinator for job f54182ae17352efb9aa40667c283ce09 2017-06-02 16:11:07,048 =C2=A0IN=46O =7C flink-akka.actor.default-dispatc= her-3 =7C o.a.f.r.checkpoint.StandaloneCompletedCheckpointStore =C2=A0 =7C= Shutting down Thanks and Regards, Mahesh --59330fdd_160b3203_1625d Content-Type: text/html; charset="utf-8" Content-Transfer-Encoding: quoted-printable Content-Disposition: inline