From user-return-32725-archive-asf-public=cust-asf.ponee.io@flink.apache.org Wed Feb 19 13:25:14 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 2D71C180658 for ; Wed, 19 Feb 2020 14:25:14 +0100 (CET) Received: (qmail 64729 invoked by uid 500); 19 Feb 2020 13:25:11 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 64719 invoked by uid 99); 19 Feb 2020 13:25:11 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Feb 2020 13:25:11 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id CE3C8C0652 for ; Wed, 19 Feb 2020 13:25:10 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.211 X-Spam-Level: X-Spam-Status: No, score=0.211 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=0.2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-0.001, SPF_HELO_NONE=0.001, T_SPF_PERMERROR=0.01, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=sensingfeeling-com.20150623.gappssmtp.com Received: from mx1-ec2-va.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id nji7o3ACSlO6 for ; Wed, 19 Feb 2020 13:25:08 +0000 (UTC) Received-SPF: Permerror (mailfrom) identity=mailfrom; client-ip=209.85.221.44; helo=mail-wr1-f44.google.com; envelope-from=chris@sensingfeeling.com; receiver= Received: from mail-wr1-f44.google.com (mail-wr1-f44.google.com [209.85.221.44]) by mx1-ec2-va.apache.org (ASF Mail Server at mx1-ec2-va.apache.org) with ESMTPS id 9FED2BB801 for ; Wed, 19 Feb 2020 13:25:07 +0000 (UTC) Received: by mail-wr1-f44.google.com with SMTP id w15so532643wru.4 for ; Wed, 19 Feb 2020 05:25:07 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=sensingfeeling-com.20150623.gappssmtp.com; s=20150623; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :cc; bh=GNJRoKt1FiH5ZIjqZTjLNnjTMrsia/8Bu7MRFSHMUjI=; b=X5rY9bFyyz99nOoFV0+K8T1GTbZ5wrzRTFgnT5J4kiSIbWWNxxtuFquqTXOeNgTfKh h4oHvslN0UaG5NYd/Ln5KNdK0akVPEmuxG5YeQWGO73oauqycSLFfj4S0IoGUjuK2vrb fraRMuHdv0FEEgQvpXBCOBje8xkVh2x7L6JqFbRjKebKanyUnW6I46KzPQ896Fvo+TEf z4cHxKeqvMMY0ec++1mReUsLN32gZYk0Nbg/y1q/3PNUx3XGHkZ3v8NH7C6T5uq9q0Zr T6VcaF/USTtzCPEOdxdJS1YBAU+cZJNdbZLIllAooPSGHG77+iAe8WcHLB2bnSQM8ccw kaZA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to:cc; bh=GNJRoKt1FiH5ZIjqZTjLNnjTMrsia/8Bu7MRFSHMUjI=; b=V3X9HMLsN4h1VreX8T93b7tjinGyv8PTKqodnoCWLHYbpb1nMCEopTGp4KQ4PXOZR0 XpJDKTyUpqeuBjQ/3yjDcf7wNPUuzNzv/kAeI/s9iJsn01yCsHB9WeZ5m5sFI2z3sY02 cLLfUBQtLdWfX6j0e7jKOXWyUm1hsbMWHgIHBuU4aGpZGdpkMCIgm7PdnG1JL3f52liZ Wr1O7KhzLV+2+b24MtXGYYTtWhb4aIYBe40Tm/vI8R42pENyPU8f+D9dNzKZB05nNXH3 T6YrIiumAa9Ol/GH0RtciY1vnhTaIfX0kG9hlA5McS9SFwOrVVAN6Bn0XCOFf3DWM/g2 /7uw== X-Gm-Message-State: APjAAAXC01HjlP3x15FRgUr3i8+akWCW9OWvv1SWd/bwEEepbS49y0dq 2xmTskb7eLoCNu/MnVBI4YEWMDNwgHB+fNM8iV6grw== X-Google-Smtp-Source: APXvYqy95UBNi4yYCTdW0pMBrQIZFv1wsKouXaoEFfvjH0erIWczcUSjw5PA9qXQ3jmeRl4dfAmF8Rq5wYo33sKzi/k= X-Received: by 2002:adf:df0e:: with SMTP id y14mr34573106wrl.377.1582118701194; Wed, 19 Feb 2020 05:25:01 -0800 (PST) MIME-Version: 1.0 References: <8cb02d8c-5070-3529-7e78-e5956ab1dcdf@apache.org> In-Reply-To: <8cb02d8c-5070-3529-7e78-e5956ab1dcdf@apache.org> From: Chris Stevens Date: Wed, 19 Feb 2020 13:24:50 +0000 Message-ID: Subject: Re: Re: Updating ValueState not working in hosted Kinesis To: Timo Walther Cc: user Content-Type: multipart/alternative; boundary="0000000000002d7955059eedb7f2" --0000000000002d7955059eedb7f2 Content-Type: text/plain; charset="UTF-8" Thanks again Timo, I hope I replied correctly this time. As per my previous message the Sensor class is a very simple POJO type (I think). When the serialization trace talks about PGSql stuff it makes me think that something from my operator is being included in serialization. Not just the Sensor object itself which I am explicitly including in state. package sensingfeeling.functions.mapping; public final class ArbJoinFunction extends RichJoinFunction, TypeA> { private static final long serialVersionUID = 8582433437601788991L; private transient ValueState sensorState; @Override public TypeA join(TypeB frame, TypeC activeMotionPaths) throws JsonProcessingException { Sensor sensor = sensorState.value(); if (sensor == null) { LOG.debug("Sensor was not in state, getting sensor: " + frame.sensorId); sensor = getSensor(frame); sensorState.update(sensor); } return new TypeA(); } @Override public void open(Configuration config) { LOG.debug("Sensor open method called", config); StateTtlConfig sensorTtlConfig = StateTtlConfig.newBuilder(Time.minutes(1)) .cleanupInBackground() .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility. ReturnExpiredIfNotCleanedUp).build(); ValueStateDescriptor sensorStateDescriptor = new ValueStateDescriptor<>( "sensor", TypeInformation.of(new TypeHint(){})); // sensorStateDescriptor.enableTimeToLive(sensorTtlConfig); sensorState = getRuntimeContext().getState(sensorStateDescriptor); } private Sensor getSensor(TypeB frame) throws Exception { Class.forName("org.postgresql.Driver"); try (Connection con = DriverManager.getConnection(dbURL, dbUser, dbPassword); Statement st = con.createStatement(); ResultSet rs = st.executeQuery("SELECT * from sensor where sensorid = '" + frame.sensorId + "'" )) { if(rs.next()) { Sensor sensor = new Sensor() {}; LOG.debug("Got sensor" + sensor); return sensor; } } catch (SQLException ex) { LOG.error("Error when connection postgres", ex); throw ex; } return null; } } Above is a cut down version of my operator, I'm guessing it is the ResultSet rs that is getting serialized. How do I prevent this undesirable behaviour? I'm quite happy for my solution to serialize only what I explicitly tell it to, I don't need exactly once or anything. Many thanks, Chris Stevens Head of Research & Development +44 7565 034 595 On Wed, 19 Feb 2020 at 12:19, Timo Walther wrote: > Hi Chris, > > [forwarding the private discussion to the mailing list again] > > first of all, are you sure that your Sensor class is either a top-level > class or a static inner class. Because it seems there is way more stuff > in it (maybe included by accident transitively?). Such as: > > org.apache.logging.log4j.core.layout.AbstractCsvLayout > > Serialization trace: > > classes (sun.misc.Launcher$AppClassLoader) > > classloader (java.security.ProtectionDomain) > > cachedPDs (javax.security.auth.SubjectDomainCombiner) > > combiner (java.security.AccessControlContext) > > acc (sun.security.ssl.SSLSocketImpl) > > connection (org.postgresql.core.PGStream) > > pgStream (org.postgresql.core.v3.QueryExecutorImpl) > > transferModeRegistry (org.postgresql.core.v3.SimpleQuery) > > commitQuery (org.postgresql.jdbc.PgConnection) > > connection (org.postgresql.jdbc.PgResultSet) > > val$rs > > When declaring state you can use > `org.apache.flink.api.common.typeinfo.Types#POJO(java.lang.Class)` to > check if your state is a POJO type. > > Regards, > Timo > > > -------- Forwarded Message -------- > Subject: Re: Updating ValueState not working in hosted Kinesis > Date: Wed, 19 Feb 2020 12:02:16 +0000 > From: Chris Stevens > To: Timo Walther > > > > Hi Timo, > > Thanks for your reply. This makes sense to me, how do I treat something > as a POJO instead of a generic serialized BB type? Sorry relatively new > to Java and Flink. > > This is my full class def: > > package sensingfeeling.models; > import java.io.Serializable; > > public class Sensor implements Serializable { > > private static final long serialVersionUID = 8582433437601788991L; > public String sensorId; > public String companyId; > public String label; > // public Date createdAt; > // public Date updatedAt; > public Integer uncomfortableFaceLimit; > public Boolean online; > public String capabilityId; > // public Date lastOnlineAt; > // public Date lastOfflineAt; > public Integer onlineVersionNumber; > public int status; > @Override > public String toString(){ > return this.sensorId + " - " + this.label; > } > } > > Super simple really. > > I'm not trying to upgrade anything as far as I know. Just making an > operator state aware. > > Many thanks, > Chris Stevens > Head of Research & Development > +44 7565 034 595 > > > On Wed, 19 Feb 2020 at 11:55, Timo Walther > wrote: > > Hi Chris, > > it seems there are field serialized into state that actually don't > belong there. You should aim to treat Sensor as a POJO instead of a > Kryo > generic serialized black-box type. > > Furthermore, it seems that field such as > "org.apache.logging.log4j.core.layout.AbstractCsvLayout" should not be > state. Is there a "transient" keyword missing? > > Are you trying to upgrade your job or the Flink version? > > Regards, > Timo > > > > On 18.02.20 18:59, Chris Stevens wrote: > > Hi there, > > > > I'm trying to update state in one of my applications hosted in > Kinesis > > Data Analytics. > > > > private transient ValueState sensorState; > > using sensorState.update(sensor); > > > > Get error: > > > > An error occurred: org.apache.flink.util.FlinkRuntimeException: > Error > > while adding data to RocksDB > > at > > > > > org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:108) > > at > > > > > org.apache.flink.runtime.state.ttl.TtlValueState.update(TtlValueState.java:50) > > at > > > > > sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:97) > > at > > > > > sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:48) > > at > > > > > org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:460) > > at > > > > > org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:777) > > at > > > > > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44) > > at > > > > > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32) > > at > > > > > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546) > > at > > > > > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454) > > at > > > > > org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:255) > > at > > > > > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128) > > at > > > > > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775) > > at > > org.apache.flink.streaming.runtime.io > > >.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262) > > at > > > > > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) > > at > > > > > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) > > at > > org.apache.flink.streaming.runtime.io > > >.StreamInputProcessor.processInput(StreamInputProcessor.java:184) > > at > > > > > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > > at > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:308) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:714) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: com.esotericsoftware.kryo.KryoException: > > java.lang.IllegalArgumentException: Unable to create serializer > > "com.esotericsoftware.kryo.serializers.FieldSerializer" for class: > > org.apache.logging.log4j.core.layout.AbstractCsvLayout > > Serialization trace: > > classes (sun.misc.Launcher$AppClassLoader) > > classloader (java.security.ProtectionDomain) > > cachedPDs (javax.security.auth.SubjectDomainCombiner) > > combiner (java.security.AccessControlContext) > > acc (sun.security.ssl.SSLSocketImpl) > > connection (org.postgresql.core.PGStream) > > pgStream (org.postgresql.core.v3.QueryExecutorImpl) > > transferModeRegistry (org.postgresql.core.v3.SimpleQuery) > > commitQuery (org.postgresql.jdbc.PgConnection) > > connection (org.postgresql.jdbc.PgResultSet) > > val$rs > > > > > (sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction$4) > > at > > > > > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82) > > at > > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > > at > > > > > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > > at > > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > > at > com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) > > at > > > > > com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:88) > > at > > > > > com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21) > > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > > at > > > > > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > > at > > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > > at > > > > > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > > at > > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > > at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577) > > at > > > > > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68) > > at > > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > > at > > > > > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > > at > > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > > at > > > > > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > > at > > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > > at > > > > > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > > at > > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > > at > > > > > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > > at > > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > > at > > > > > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > > at > > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > > at > > > > > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > > at > > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > > at > com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) > > at > > > > > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:305) > > at > > > > > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(PojoSerializer.java:362) > > at > > > > > org.apache.flink.api.common.typeutils.CompositeSerializer.serialize(CompositeSerializer.java:142) > > at > > > > > org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValueInternal(AbstractRocksDBState.java:158) > > at > > > > > org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:178) > > at > > > > > org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:167) > > at > > > > > org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:106) > > ... 20 more > > Caused by: java.lang.IllegalArgumentException: Unable to create > > serializer > "com.esotericsoftware.kryo.serializers.FieldSerializer" for > > class: org.apache.logging.log4j.core.layout.AbstractCsvLayout > > at > > > > > com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:48) > > at > > > > > com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:26) > > at > com.esotericsoftware.kryo.Kryo.newDefaultSerializer(Kryo.java:351) > > at > com.twitter.chill.KryoBase.newDefaultSerializer(KryoBase.scala:58) > > at > com.esotericsoftware.kryo.Kryo.getDefaultSerializer(Kryo.java:344) > > at > > > > > com.esotericsoftware.kryo.util.DefaultClassResolver.registerImplicit(DefaultClassResolver.java:56) > > at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:461) > > at com.twitter.chill.KryoBase.getRegistration(KryoBase.scala:52) > > at > > > > > com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79) > > at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488) > > at > > > > > com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.write(DefaultSerializers.java:239) > > at > > > > > com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.write(DefaultSerializers.java:232) > > at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577) > > at > > > > > com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:75) > > at > > > > > com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22) > > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > > at > > > > > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > > ... 62 more > > Caused by: java.lang.reflect.InvocationTargetException > > at sun.reflect.GeneratedConstructorAccessor42.newInstance(Unknown > Source) > > at > > > > > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > > at > > > > > com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:35) > > ... 78 more > > Caused by: java.lang.NoClassDefFoundError: > > Lorg/apache/commons/csv/CSVFormat; > > at java.lang.Class.getDeclaredFields0(Native Method) > > at java.lang.Class.privateGetDeclaredFields(Class.java:2583) > > at java.lang.Class.getDeclaredFields(Class.java:1916) > > at > > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:193) > > at > > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:156) > > at > > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.(FieldSerializer.java:133) > > ... 82 more > > Caused by: java.lang.ClassNotFoundException: > > org.apache.commons.csv.CSVFormat > > at java.net.URLClassLoader.findClass(URLClassLoader.java:382) > > at java.lang.ClassLoader.loadClass(ClassLoader.java:418) > > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) > > at java.lang.ClassLoader.loadClass(ClassLoader.java:351) > > ... 88 more > > > > Any help would be great. I tried manually including CSVFormat from > > apache commons but didn't change anything. > > > > Many thanks, > > Chris Stevens > > Head of Research & Development > > +44 7565 034 595 > > --0000000000002d7955059eedb7f2 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Thanks again Timo, I hope I replied correctly this ti= me.

As per my previous message the Sensor class is= a very simple POJO type (I think).

When the seria= lization trace talks about PGSql stuff it makes me think that something fro= m my operator is being included in serialization. Not just the Sensor objec= t itself which I am explicitly including in state.

=
p= ackage sensingfeeling.functio= ns.mapping;

publ= ic final class ArbJoinFunct= ion extends <= /span>RichJoinFunction<TypeB, TypeC>, TypeA> {

pri= vate static <= /span>final long serialVersionUID= =3D 8582433437601788991L;

private transient ValueState<Sensor> sensorState;

@Override<= /div>
public= TypeA join(TypeB <= span style=3D"color:rgb(156,220,254)">frame, TypeC activeMotionPaths) <= /span>throws JsonProc= essingException {
Sensor sensor =3D sensorState.value();
= if (sensor =3D=3D null) {
LOG.= debug("= Sensor was not in state, getting sensor: " + frame
.sensorId);
sensor =3D= getSensor(frame);
sensorState.update(sensor);
= }

= return new<= /span> TypeA();
}

=
@Override
public void open(Configuration config) {
LOG.debug("Sens= or open method called", = config);

= StateTtlConfig sensorTtlConfig =3D StateTtlConfig.= newBuilder(Time.minutes(= 1))
.cleanupInBackground()
= .setUpdateType(StateTtlConfig.UpdateType.OnCreateAnd= Write)
.setStateVisibility(StateTtlConfig= .StateVisibility.ReturnExpiredIfNotCleanedUp).build();

ValueStateDescriptor<Se= nsor> sensorStateDescriptor =3D new<= /span> ValueStateDescriptor<>( "sensor= ", TypeInformation.of(new TypeHint<Sensor>(){}));
/= / sensorStateDescriptor.enableTimeToLive(sensorTtlConfig);
sensorState =3D getRuntimeContext().get= State(sensorStateDescriptor);=

}
private Sensor = getSensor(TypeB frame) = throws Exception {

<= /span>Class.forName= ("org.postgresql.Driver");
try (Connection con =3D DriverManager.getConnection(= dbURL, dbUser, dbPassword);
State= ment st =3D = con.createSt= atement();
<= span style=3D"color:rgb(212,212,212)"> ResultSet rs =3D = st.executeQuery("SELECT * from senso= r where sensorid =3D '" + frame.= sensorId + "'" )) {

if<= span style=3D"color:rgb(212,212,212)">(rs.next()) {
= Sensor sensor =3D new Sensor() {};

LOG.debug("Got sensor" + sensor);

return sensor;
= }

= } catch (= SQLException ex) {
= LOG.error("Error when connection postgres", ex);
t= hrow ex;
}

return null;
}

}
=

= Above is a cut down version of my operator, I'm guessing it is the Resu= ltSet rs that is getting serialized. How do I prevent this undesirable beha= viour? I'm quite happy for my solution to serialize only what I explici= tly tell it to, I don't need exactly once or anything.
Many thanks,
Chris Stevens
Head of Researc= h & Development
+44 7565 034 595
<= /div>


On Wed, 19 Feb 2020 at 12:19, Timo Walther <<= a href=3D"mailto:twalthr@apache.org" target=3D"_blank">twalthr@apache.org> wrote:
Hi= Chris,

[forwarding the private discussion to the mailing list again]

first of all, are you sure that your Sensor class is either a top-level class or a static inner class. Because it seems there is way more stuff in it (maybe included by accident transitively?). Such as:

org.apache.logging.log4j.core.layout.AbstractCsvLayout
=C2=A0 =C2=A0 =C2=A0 > Serialization trace:
=C2=A0 =C2=A0 =C2=A0 > classes (sun.misc.Launcher$AppClassLoader)
=C2=A0 =C2=A0 =C2=A0 > classloader (java.security.ProtectionDomain)
=C2=A0 =C2=A0 =C2=A0 > cachedPDs (javax.security.auth.SubjectDomainCombi= ner)
=C2=A0 =C2=A0 =C2=A0 > combiner (java.security.AccessControlContext)
=C2=A0 =C2=A0 =C2=A0 > acc (sun.security.ssl.SSLSocketImpl)
=C2=A0 =C2=A0 =C2=A0 > connection (org.postgresql.core.PGStream)
=C2=A0 =C2=A0 =C2=A0 > pgStream (org.postgresql.core.v3.QueryExecutorImp= l)
=C2=A0 =C2=A0 =C2=A0 > transferModeRegistry (org.postgresql.core.v3.Simp= leQuery)
=C2=A0 =C2=A0 =C2=A0 > commitQuery (org.postgresql.jdbc.PgConnection) =C2=A0 =C2=A0 =C2=A0 > connection (org.postgresql.jdbc.PgResultSet)
=C2=A0 =C2=A0 =C2=A0 > val$rs

When declaring state you can use
`org.apache.flink.api.common.typeinfo.Types#POJO(java.lang.Class<T>)`= to
check if your state is a POJO type.

Regards,
Timo


-------- Forwarded Message --------
Subject:=C2=A0 =C2=A0 =C2=A0 =C2=A0 Re: Updating ValueState not working in = hosted Kinesis
Date:=C2=A0 =C2=A0Wed, 19 Feb 2020 12:02:16 +0000
From:=C2=A0 =C2=A0Chris Stevens <chris@sensingfeeling.com>
To:=C2=A0 =C2=A0 =C2=A0Timo Walther <twalthr@apache.org>



Hi Timo,

Thanks for your reply. This makes sense to me, how do I treat something as a POJO instead of a generic serialized BB type? Sorry relatively new to Java and Flink.

This is my full class def:

package sensingfeeling.models;
import java.io.Serializable;

public class Sensor implements Serializable {

=C2=A0 =C2=A0=C2=A0=C2=A0 private static final long serialVersionUID =3D 85= 82433437601788991L;
=C2=A0 =C2=A0=C2=A0=C2=A0 public String sensorId;
=C2=A0 =C2=A0=C2=A0=C2=A0 public String companyId;
=C2=A0 =C2=A0=C2=A0=C2=A0 public String label;
=C2=A0 =C2=A0=C2=A0=C2=A0 // public Date createdAt;
=C2=A0 =C2=A0=C2=A0=C2=A0 // public Date updatedAt;
=C2=A0 =C2=A0=C2=A0=C2=A0 public Integer uncomfortableFaceLimit;
=C2=A0 =C2=A0=C2=A0=C2=A0 public Boolean online;
=C2=A0 =C2=A0=C2=A0=C2=A0 public String capabilityId;
=C2=A0 =C2=A0=C2=A0=C2=A0 // public Date lastOnlineAt;
=C2=A0 =C2=A0=C2=A0=C2=A0 // public Date lastOfflineAt;
=C2=A0 =C2=A0=C2=A0=C2=A0 public Integer onlineVersionNumber;
=C2=A0 =C2=A0=C2=A0=C2=A0 public int status;
=C2=A0 =C2=A0=C2=A0=C2=A0 @Override
=C2=A0 =C2=A0=C2=A0=C2=A0 public String toString(){
=C2=A0 =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 return this.sensorId + &q= uot; - " + this.label;
=C2=A0 =C2=A0=C2=A0=C2=A0 }
}

Super simple really.

I'm not trying to upgrade anything as far as I know. Just making an operator state aware.

Many thanks,
Chris Stevens
Head of Research & Development
+44 7565 034 595


On Wed, 19 Feb 2020 at 11:55, Timo Walther <twalthr@apache.org
<mailto:twalthr@= apache.org>> wrote:

=C2=A0 =C2=A0 =C2=A0Hi Chris,

=C2=A0 =C2=A0 =C2=A0it seems there are field serialized into state that act= ually don't
=C2=A0 =C2=A0 =C2=A0belong there. You should aim to treat Sensor as a POJO = instead of a
=C2=A0 =C2=A0 =C2=A0Kryo
=C2=A0 =C2=A0 =C2=A0generic serialized black-box type.

=C2=A0 =C2=A0 =C2=A0Furthermore, it seems that field such as
=C2=A0 =C2=A0 =C2=A0"org.apache.logging.log4j.core.layout.AbstractCsvL= ayout" should not be
=C2=A0 =C2=A0 =C2=A0state. Is there a "transient" keyword missing= ?

=C2=A0 =C2=A0 =C2=A0Are you trying to upgrade your job or the Flink version= ?

=C2=A0 =C2=A0 =C2=A0Regards,
=C2=A0 =C2=A0 =C2=A0Timo



=C2=A0 =C2=A0 =C2=A0On 18.02.20 18:59, Chris Stevens wrote:
=C2=A0 =C2=A0 =C2=A0 > Hi there,
=C2=A0 =C2=A0 =C2=A0 >
=C2=A0 =C2=A0 =C2=A0 > I'm trying to update state in one of my appli= cations hosted in
=C2=A0 =C2=A0 =C2=A0Kinesis
=C2=A0 =C2=A0 =C2=A0 > Data Analytics.
=C2=A0 =C2=A0 =C2=A0 >
=C2=A0 =C2=A0 =C2=A0 > private transient ValueState<Sensor> sensor= State;
=C2=A0 =C2=A0 =C2=A0 > using sensorState.update(sensor);
=C2=A0 =C2=A0 =C2=A0 >
=C2=A0 =C2=A0 =C2=A0 > Get error:
=C2=A0 =C2=A0 =C2=A0 >
=C2=A0 =C2=A0 =C2=A0 > An error occurred: org.apache.flink.util.FlinkRun= timeException:
=C2=A0 =C2=A0 =C2=A0Error
=C2=A0 =C2=A0 =C2=A0 > while adding data to RocksDB
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBVa= lueState.java:108)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

org.apache.flink.runtime.state.ttl.TtlValueState.update(TtlValueState.java:= 50)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.jo= in(FrameMotionPathsToTelemetryJoinFunction.java:97)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.jo= in(FrameMotionPathsToTelemetryJoinFunction.java:48)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction= .coGroup(JoinedStreams.java:460)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFun= ction.apply(CoGroupedStreams.java:777)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIt= erableWindowFunction.process(InternalIterableWindowFunction.java:44)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIt= erableWindowFunction.process(InternalIterableWindowFunction.java:32)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitW= indowContents(WindowOperator.java:546)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEve= ntTime(WindowOperator.java:454)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWa= termark(InternalTimerServiceImpl.java:255)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advance= Watermark(InternalTimeServiceManager.java:128)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWate= rmark(AbstractStreamOperator.java:775)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 > org.apache.flink.streaming.run= time.io

<http= ://runtime.io>.StreamInputProcessor$ForwardingValveOutputHandler.han= dleWatermark(StreamInputProcessor.java:262)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAn= dOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)<= br> =C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputW= atermark(StatusWatermarkValve.java:111)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 > org.apache.flink.streaming.run= time.io

<http= ://runtime.io>.StreamInputProcessor.processInput(StreamInputProcesso= r.java:184)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStr= eamTask.java:105)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:= 308)
=C2=A0 =C2=A0 =C2=A0 > at org.apache.flink.runtime.taskmanager.Task.run(= Task.java:714)
=C2=A0 =C2=A0 =C2=A0 > at java.lang.Thread.run(Thread.java:748)
=C2=A0 =C2=A0 =C2=A0 > Caused by: com.esotericsoftware.kryo.KryoExceptio= n:
=C2=A0 =C2=A0 =C2=A0 > java.lang.IllegalArgumentException: Unable to cre= ate serializer
=C2=A0 =C2=A0 =C2=A0 > "com.esotericsoftware.kryo.serializers.Field= Serializer" for class:
=C2=A0 =C2=A0 =C2=A0 > org.apache.logging.log4j.core.layout.AbstractCsvL= ayout
=C2=A0 =C2=A0 =C2=A0 > Serialization trace:
=C2=A0 =C2=A0 =C2=A0 > classes (sun.misc.Launcher$AppClassLoader)
=C2=A0 =C2=A0 =C2=A0 > classloader (java.security.ProtectionDomain)
=C2=A0 =C2=A0 =C2=A0 > cachedPDs (javax.security.auth.SubjectDomainCombi= ner)
=C2=A0 =C2=A0 =C2=A0 > combiner (java.security.AccessControlContext)
=C2=A0 =C2=A0 =C2=A0 > acc (sun.security.ssl.SSLSocketImpl)
=C2=A0 =C2=A0 =C2=A0 > connection (org.postgresql.core.PGStream)
=C2=A0 =C2=A0 =C2=A0 > pgStream (org.postgresql.core.v3.QueryExecutorImp= l)
=C2=A0 =C2=A0 =C2=A0 > transferModeRegistry (org.postgresql.core.v3.Simp= leQuery)
=C2=A0 =C2=A0 =C2=A0 > commitQuery (org.postgresql.jdbc.PgConnection) =C2=A0 =C2=A0 =C2=A0 > connection (org.postgresql.jdbc.PgResultSet)
=C2=A0 =C2=A0 =C2=A0 > val$rs
=C2=A0 =C2=A0 =C2=A0 >

(sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction$4= )
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82= )
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer= .java:495)
=C2=A0 =C2=A0 =C2=A0 > at com.esotericsoftware.kryo.Kryo.writeObject(Kry= o.java:523)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61= )
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer= .java:495)
=C2=A0 =C2=A0 =C2=A0 > at com.esotericsoftware.kryo.Kryo.writeClassAndOb= ject(Kryo.java:599)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.jav= a:88)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.jav= a:21)
=C2=A0 =C2=A0 =C2=A0 > at com.esotericsoftware.kryo.Kryo.writeObject(Kry= o.java:523)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61= )
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer= .java:495)
=C2=A0 =C2=A0 =C2=A0 > at com.esotericsoftware.kryo.Kryo.writeObject(Kry= o.java:523)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61= )
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer= .java:495)
=C2=A0 =C2=A0 =C2=A0 > at com.esotericsoftware.kryo.Kryo.writeObjectOrNu= ll(Kryo.java:577)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68= )
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer= .java:495)
=C2=A0 =C2=A0 =C2=A0 > at com.esotericsoftware.kryo.Kryo.writeObject(Kry= o.java:523)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61= )
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer= .java:495)
=C2=A0 =C2=A0 =C2=A0 > at com.esotericsoftware.kryo.Kryo.writeObject(Kry= o.java:523)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61= )
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer= .java:495)
=C2=A0 =C2=A0 =C2=A0 > at com.esotericsoftware.kryo.Kryo.writeObject(Kry= o.java:523)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61= )
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer= .java:495)
=C2=A0 =C2=A0 =C2=A0 > at com.esotericsoftware.kryo.Kryo.writeObject(Kry= o.java:523)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61= )
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer= .java:495)
=C2=A0 =C2=A0 =C2=A0 > at com.esotericsoftware.kryo.Kryo.writeObject(Kry= o.java:523)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61= )
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer= .java:495)
=C2=A0 =C2=A0 =C2=A0 > at com.esotericsoftware.kryo.Kryo.writeObject(Kry= o.java:523)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61= )
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer= .java:495)
=C2=A0 =C2=A0 =C2=A0 > at com.esotericsoftware.kryo.Kryo.writeClassAndOb= ject(Kryo.java:599)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(K= ryoSerializer.java:305)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(PojoSe= rializer.java:362)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

org.apache.flink.api.common.typeutils.CompositeSerializer.serialize(Composi= teSerializer.java:142)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValu= eInternal(AbstractRocksDBState.java:158)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValu= e(AbstractRocksDBState.java:178)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValu= e(AbstractRocksDBState.java:167)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBVa= lueState.java:106)
=C2=A0 =C2=A0 =C2=A0 > ... 20 more
=C2=A0 =C2=A0 =C2=A0 > Caused by: java.lang.IllegalArgumentException: Un= able to create
=C2=A0 =C2=A0 =C2=A0 > serializer
=C2=A0 =C2=A0 =C2=A0"com.esotericsoftware.kryo.serializers.FieldSerial= izer" for
=C2=A0 =C2=A0 =C2=A0 > class: org.apache.logging.log4j.core.layout.Abstr= actCsvLayout
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSeriali= zer(ReflectionSerializerFactory.java:48)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSeriali= zer(ReflectionSerializerFactory.java:26)
=C2=A0 =C2=A0 =C2=A0 > at
com.esotericsoftware.kryo.Kryo.newDefaultSerializer(Kryo.java:351)
=C2=A0 =C2=A0 =C2=A0 > at
com.twitter.chill.KryoBase.newDefaultSerializer(KryoBase.scala:58)
=C2=A0 =C2=A0 =C2=A0 > at
com.esotericsoftware.kryo.Kryo.getDefaultSerializer(Kryo.java:344)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

com.esotericsoftware.kryo.util.DefaultClassResolver.registerImplicit(Defaul= tClassResolver.java:56)
=C2=A0 =C2=A0 =C2=A0 > at com.esotericsoftware.kryo.Kryo.getRegistration= (Kryo.java:461)
=C2=A0 =C2=A0 =C2=A0 > at com.twitter.chill.KryoBase.getRegistration(Kry= oBase.scala:52)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClass= Resolver.java:79)
=C2=A0 =C2=A0 =C2=A0 > at com.esotericsoftware.kryo.Kryo.writeClass(Kryo= .java:488)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.wr= ite(DefaultSerializers.java:239)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.wr= ite(DefaultSerializers.java:232)
=C2=A0 =C2=A0 =C2=A0 > at com.esotericsoftware.kryo.Kryo.writeObjectOrNu= ll(Kryo.java:577)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

com.esotericsoftware.kryo.serializers.CollectionSerializer.write(Collection= Serializer.java:75)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

com.esotericsoftware.kryo.serializers.CollectionSerializer.write(Collection= Serializer.java:22)
=C2=A0 =C2=A0 =C2=A0 > at com.esotericsoftware.kryo.Kryo.writeObject(Kry= o.java:523)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61= )
=C2=A0 =C2=A0 =C2=A0 > ... 62 more
=C2=A0 =C2=A0 =C2=A0 > Caused by: java.lang.reflect.InvocationTargetExce= ption
=C2=A0 =C2=A0 =C2=A0 > at sun.reflect.GeneratedConstructorAccessor42.new= Instance(Unknown
=C2=A0 =C2=A0 =C2=A0Source)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstru= ctorAccessorImpl.java:45)
=C2=A0 =C2=A0 =C2=A0 > at java.lang.reflect.Constructor.newInstance(Cons= tructor.java:423)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSeriali= zer(ReflectionSerializerFactory.java:35)
=C2=A0 =C2=A0 =C2=A0 > ... 78 more
=C2=A0 =C2=A0 =C2=A0 > Caused by: java.lang.NoClassDefFoundError:
=C2=A0 =C2=A0 =C2=A0 > Lorg/apache/commons/csv/CSVFormat;
=C2=A0 =C2=A0 =C2=A0 > at java.lang.Class.getDeclaredFields0(Native Meth= od)
=C2=A0 =C2=A0 =C2=A0 > at java.lang.Class.privateGetDeclaredFields(Class= .java:2583)
=C2=A0 =C2=A0 =C2=A0 > at java.lang.Class.getDeclaredFields(Class.java:1= 916)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(F= ieldSerializer.java:193)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(F= ieldSerializer.java:156)
=C2=A0 =C2=A0 =C2=A0 > at
=C2=A0 =C2=A0 =C2=A0 >

com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSer= ializer.java:133)
=C2=A0 =C2=A0 =C2=A0 > ... 82 more
=C2=A0 =C2=A0 =C2=A0 > Caused by: java.lang.ClassNotFoundException:
=C2=A0 =C2=A0 =C2=A0 > org.apache.commons.csv.CSVFormat
=C2=A0 =C2=A0 =C2=A0 > at java.net.URLClassLoader.findClass(URLClassLoad= er.java:382)
=C2=A0 =C2=A0 =C2=A0 > at java.lang.ClassLoader.loadClass(ClassLoader.ja= va:418)
=C2=A0 =C2=A0 =C2=A0 > at sun.misc.Launcher$AppClassLoader.loadClass(Lau= ncher.java:352)
=C2=A0 =C2=A0 =C2=A0 > at java.lang.ClassLoader.loadClass(ClassLoader.ja= va:351)
=C2=A0 =C2=A0 =C2=A0 > ... 88 more
=C2=A0 =C2=A0 =C2=A0 >
=C2=A0 =C2=A0 =C2=A0 > Any help would be great. I tried manually includi= ng CSVFormat from
=C2=A0 =C2=A0 =C2=A0 > apache commons but didn't change anything. =C2=A0 =C2=A0 =C2=A0 >
=C2=A0 =C2=A0 =C2=A0 > Many thanks,
=C2=A0 =C2=A0 =C2=A0 > Chris Stevens
=C2=A0 =C2=A0 =C2=A0 > Head of Research & Development
=C2=A0 =C2=A0 =C2=A0 > +44 7565 034 595

--0000000000002d7955059eedb7f2--