flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timo Walther <twal...@apache.org>
Subject Re: Updating ValueState not working in hosted Kinesis
Date Wed, 19 Feb 2020 11:55:26 GMT
Hi Chris,

it seems there are field serialized into state that actually don't 
belong there. You should aim to treat Sensor as a POJO instead of a Kryo 
generic serialized black-box type.

Furthermore, it seems that field such as 
"org.apache.logging.log4j.core.layout.AbstractCsvLayout" should not be 
state. Is there a "transient" keyword missing?

Are you trying to upgrade your job or the Flink version?

Regards,
Timo



On 18.02.20 18:59, Chris Stevens wrote:
> Hi there,
> 
> I'm trying to update state in one of my applications hosted in Kinesis 
> Data Analytics.
> 
> private transient ValueState<Sensor> sensorState;
> using sensorState.update(sensor);
> 
> Get error:
> 
> An error occurred: org.apache.flink.util.FlinkRuntimeException: Error 
> while adding data to RocksDB
> at 
> org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:108)
> at 
> org.apache.flink.runtime.state.ttl.TtlValueState.update(TtlValueState.java:50)
> at 
> sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:97)
> at 
> sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:48)
> at 
> org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:460)
> at 
> org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:777)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454)
> at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:255)
> at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
> at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
> at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:308)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:714)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: com.esotericsoftware.kryo.KryoException: 
> java.lang.IllegalArgumentException: Unable to create serializer 
> "com.esotericsoftware.kryo.serializers.FieldSerializer" for class: 
> org.apache.logging.log4j.core.layout.AbstractCsvLayout
> Serialization trace:
> classes (sun.misc.Launcher$AppClassLoader)
> classloader (java.security.ProtectionDomain)
> cachedPDs (javax.security.auth.SubjectDomainCombiner)
> combiner (java.security.AccessControlContext)
> acc (sun.security.ssl.SSLSocketImpl)
> connection (org.postgresql.core.PGStream)
> pgStream (org.postgresql.core.v3.QueryExecutorImpl)
> transferModeRegistry (org.postgresql.core.v3.SimpleQuery)
> commitQuery (org.postgresql.jdbc.PgConnection)
> connection (org.postgresql.jdbc.PgResultSet)
> val$rs 
> (sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction$4)
> at 
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at 
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
> at 
> com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:88)
> at 
> com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at 
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at 
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
> at 
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at 
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at 
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at 
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at 
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at 
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at 
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
> at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:305)
> at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(PojoSerializer.java:362)
> at 
> org.apache.flink.api.common.typeutils.CompositeSerializer.serialize(CompositeSerializer.java:142)
> at 
> org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValueInternal(AbstractRocksDBState.java:158)
> at 
> org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:178)
> at 
> org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:167)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:106)
> ... 20 more
> Caused by: java.lang.IllegalArgumentException: Unable to create 
> serializer "com.esotericsoftware.kryo.serializers.FieldSerializer" for 
> class: org.apache.logging.log4j.core.layout.AbstractCsvLayout
> at 
> com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:48)
> at 
> com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:26)
> at com.esotericsoftware.kryo.Kryo.newDefaultSerializer(Kryo.java:351)
> at com.twitter.chill.KryoBase.newDefaultSerializer(KryoBase.scala:58)
> at com.esotericsoftware.kryo.Kryo.getDefaultSerializer(Kryo.java:344)
> at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.registerImplicit(DefaultClassResolver.java:56)
> at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:461)
> at com.twitter.chill.KryoBase.getRegistration(KryoBase.scala:52)
> at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
> at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488)
> at 
> com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.write(DefaultSerializers.java:239)
> at 
> com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.write(DefaultSerializers.java:232)
> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
> at 
> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:75)
> at 
> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at 
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> ... 62 more
> Caused by: java.lang.reflect.InvocationTargetException
> at sun.reflect.GeneratedConstructorAccessor42.newInstance(Unknown Source)
> at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at 
> com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:35)
> ... 78 more
> Caused by: java.lang.NoClassDefFoundError: 
> Lorg/apache/commons/csv/CSVFormat;
> at java.lang.Class.getDeclaredFields0(Native Method)
> at java.lang.Class.privateGetDeclaredFields(Class.java:2583)
> at java.lang.Class.getDeclaredFields(Class.java:1916)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:193)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:156)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:133)
> ... 82 more
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.commons.csv.CSVFormat
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
> ... 88 more
> 
> Any help would be great. I tried manually including CSVFormat from 
> apache commons but didn't change anything.
> 
> Many thanks,
> Chris Stevens
> Head of Research & Development
> +44 7565 034 595


Mime
View raw message