Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 32662200ACA for ; Thu, 9 Jun 2016 20:38:19 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 30F44160A58; Thu, 9 Jun 2016 18:38:19 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D63F3160A29 for ; Thu, 9 Jun 2016 20:38:17 +0200 (CEST) Received: (qmail 3339 invoked by uid 500); 9 Jun 2016 18:38:17 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 3329 invoked by uid 99); 9 Jun 2016 18:38:16 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Jun 2016 18:38:16 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 89970C0773 for ; Thu, 9 Jun 2016 18:38:16 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.281 X-Spam-Level: * X-Spam-Status: No, score=1.281 tagged_above=-999 required=6.31 tests=[AC_DIV_BONANZA=0.001, DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HEADER_FROM_DIFFERENT_DOMAINS=0.001, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id Bb7L3u8Jhqyd for ; Thu, 9 Jun 2016 18:38:13 +0000 (UTC) Received: from mail-it0-f44.google.com (mail-it0-f44.google.com [209.85.214.44]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 5AF515F4E8 for ; Thu, 9 Jun 2016 18:38:12 +0000 (UTC) Received: by mail-it0-f44.google.com with SMTP id z189so161957800itg.0 for ; Thu, 09 Jun 2016 11:38:12 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:sender:in-reply-to:references:from:date:message-id :subject:to; bh=rbfbaM7URcVOhRQYh5iTbRAeTuuBrtpMkqENIIbyzNY=; b=J55VEjbRBz3B40vUVhMpqkwluc6r10eaB+3izh8x4+i3IpRzNBJH1ZLpPhAn95fG1s hnCOQdtxmsKwPuXNg5an3SA80m6/ycOoLeZDALCJien6WG9yfDa+hy4n5/SUrGNeXlwg Z8ZnzY4ZsjMY5VKZJQFAfOrR5BvkN1ta9Xm+d3+Ier6cUGOqJ2AJ3BWmxwKboyG3zzUS AY/Ugqtqw1BnPynQgUbIZtPKc0QZ+zv4kEuPuXUQyGYtBJ9gyp2Wjn49mzcB85bUKjrz T3UWb1WiAF002Uy23Eu/HSl6UwCdPPUneoHls1VJSQQysyVkXH9pMv9oDOVNKrq78dIG iZAA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:sender:in-reply-to:references:from :date:message-id:subject:to; bh=rbfbaM7URcVOhRQYh5iTbRAeTuuBrtpMkqENIIbyzNY=; b=EjV1Rq2cmStCICLvx21C+I5UW4Hh9wi9m4Ws5+IQNxekxtxRFKTsvyfaSOBkjlEEtz qUP5CgfoQ9J6w3DEGGRnY310M6HKFcteX14mNyMBKf4w64czpu7RM3TiMoPEiiQmboMR nTkXtEPjmVn3okucV/8WOpBKLNWlILLmco56GjWWJBFe6thtby1KSMk896gU9BQ3L6Tz oamON0XO15a+gTQmi8rqzDkUsoSBYngzOMdzdkXfa8+YuFPsARynimS65ZMrYbDqIfzG /e88E7EGelZQBwcjy8au4mXX4xgW878lnYm9Uy1FDjqecmNj3oMcj/xlVA2Y253VfDrw V2nQ== X-Gm-Message-State: ALyK8tLoavcWBveHOcRl5rr2MfPfekkXcry1zzmTA1tSszAOmpCJA6//ZiFw/aMTkKv+IaOVla5WAKZ76OsRPA== X-Received: by 10.36.92.199 with SMTP id q190mr19418760itb.25.1465497485095; Thu, 09 Jun 2016 11:38:05 -0700 (PDT) MIME-Version: 1.0 Sender: ewenstephan@gmail.com Received: by 10.107.7.78 with HTTP; Thu, 9 Jun 2016 11:38:04 -0700 (PDT) In-Reply-To: References: From: Stephan Ewen Date: Thu, 9 Jun 2016 20:38:04 +0200 X-Google-Sender-Auth: wjevzvzDS6mD3Ei1X_DiWQZob-Y Message-ID: Subject: Re: NotSerializableException To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a1144b95204b8960534dcb9a2 archived-at: Thu, 09 Jun 2016 18:38:19 -0000 --001a1144b95204b8960534dcb9a2 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable You can also make the KeySelector a static inner class. That should work as well. On Thu, Jun 9, 2016 at 7:00 PM, Tarandeep Singh wrote= : > Thank you Aljoscha and Fabian for your replies. > > @Aljoscha: when you said "cleaner is not used in CoGroup.where(). I'm > afraid this is a bug", I am assuming you are referring to Flink engine > itself. > > @Fabian: thanks for the optimization tip. > > This is how I have got it working (with a hack): In my dataset, the join > field/key can be null otherwise .where(fieldName) works and I don't get > not-serializable exception. So I applied a MapFunction to DataSet and put= a > dummy value in the join field/key where it was null. Then In the join > function, I change it back to null. > > Best, > Tarandeep > > On Thu, Jun 9, 2016 at 7:06 AM, Aljoscha Krettek > wrote: > >> Hi, >> the problem is that the KeySelector is an anonymous inner class and as >> such as a reference to the outer RecordFilterer object. Normally, this >> would be rectified by the closure cleaner but the cleaner is not used in >> CoGroup.where(). I'm afraid this is a bug. >> >> Best, >> Aljoscha >> >> >> On Thu, 9 Jun 2016 at 14:06 Fabian Hueske wrote: >> >>> Hi Tarandeep, >>> >>> the exception suggests that Flink tries to serialize RecordsFilterer as >>> a user function (this happens via Java Serialization). >>> I said suggests because the code that uses RecordsFilterer is not >>> included. >>> >>> To me it looks like RecordsFilterer should not be used as a user >>> function. It is a helper class to construct a DataSet program, so it sh= ould >>> not be shipped for execution. >>> You would use such a class as follows: >>> >>> DataSet records =3D ... >>> DataSet filterIDs =3D ... >>> >>> RecordsFilterer rf =3D new RecordsFilterer(); >>> DataSet> result =3D rf.addFilterFlag(records, >>> filterIDs, "myField"); >>> >>> Regarding the join code, I would suggest an optimization. >>> Instead of using CoGroup, I would use distinct and an OuterJoin like >>> this: >>> >>> DataSet distIds =3D filtereredIds.distinct(); >>> DataSet result =3D records >>> .leftOuterJoin(distIds) >>> .where(KEYSELECTOR) >>> .equalTo("*") // use full string as key >>> .with(JOINFUNC) // set Bool to false if right =3D=3D null, true other= wise >>> >>> Best, Fabian >>> >>> 2016-06-09 2:28 GMT+02:00 Tarandeep Singh : >>> >>>> Hi, >>>> >>>> I am getting NoSerializableException in this class- >>>> >>>> =EF=BB=BF >>>> >>>> public class RecordsFilterer { >>>> >>>> public DataSet> addFilterFlag(DataSet dataset= , DataSet filteredIds, String fieldName) { >>>> return dataset.coGroup(filteredIds) >>>> .where(new KeySelector() { >>>> @Override >>>> public String getKey(T t) throws Exception { >>>> String s =3D (String) t.get(fieldName); >>>> return s !=3D null ? s : UUID.randomUUID().toS= tring(); >>>> } >>>> }) >>>> .equalTo((KeySelector) s -> s) >>>> .with(new CoGroupFunction= >() { >>>> @Override >>>> public void coGroup(Iterable records, Iterable<= String> ids, >>>> Collector> c= ollector) throws Exception { >>>> boolean filterFlag =3D false; >>>> for (String id : ids) { >>>> filterFlag =3D true; >>>> } >>>> >>>> for (T record : records) { >>>> collector.collect(new Tuple2<>(filterFlag,= record)); >>>> } >>>> } >>>> }); >>>> >>>> } >>>> } >>>> >>>> >>>> What I am trying to do is write a generic code that will join Avro >>>> records (of different types) with String records and there is a match = add a >>>> filter flag. This way I can use the same code for different Avro recor= d >>>> types. But I am getting this exception- >>>> >>>> Exception in thread "main" >>>> org.apache.flink.optimizer.CompilerException: Error translating node '= Map >>>> "Key Extractor" : MAP [[ GlobalProperties [partitioning=3DRANDOM_PARTI= TIONED] >>>> ]] [[ LocalProperties [ordering=3Dnull, grouped=3Dnull, unique=3Dnull]= ]]': Could >>>> not write the user code wrapper class >>>> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : >>>> java.io.NotSerializableException: >>>> com.styleseat.dataplatform.etl.jobs.boundary.RecordsFilterer >>>> at >>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(Jo= bGraphGenerator.java:386) >>>> at >>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(Jo= bGraphGenerator.java:109) >>>> at >>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInput= PlanNode.java:198) >>>> at >>>> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlan= Node.java:163) >>>> at >>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInput= PlanNode.java:199) >>>> at >>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInput= PlanNode.java:199) >>>> at >>>> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.jav= a:128) >>>> at >>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobG= raph(JobGraphGenerator.java:188) >>>> at >>>> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:1= 87) >>>> at >>>> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.ja= va:90) >>>> at >>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnviro= nment.java:855) >>>> at >>>> com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.run= SearchLogProcessor(RunSearchLogProcessorV2.java:57) >>>> at >>>> com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.mai= n(RunSearchLogProcessorV2.java:32) >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>> at >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.j= ava:62) >>>> at >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccess= orImpl.java:43) >>>> at java.lang.reflect.Method.invoke(Method.java:497) >>>> at >>>> com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) >>>> Caused by: >>>> org.apache.flink.runtime.operators.util.CorruptConfigurationException: >>>> Could not write the user code wrapper class >>>> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : >>>> java.io.NotSerializableException: RecordsFilterer >>>> at >>>> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(Task= Config.java:275) >>>> at >>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingl= eInputVertex(JobGraphGenerator.java:843) >>>> at >>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(Jo= bGraphGenerator.java:331) >>>> ... 17 more >>>> Caused by: java.io.NotSerializableException: RecordsFilterer >>>> at >>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >>>> at >>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:= 1548) >>>> at >>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:150= 9) >>>> at >>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java= :1432) >>>> at >>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >>>> at >>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:= 1548) >>>> at >>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:150= 9) >>>> at >>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java= :1432) >>>> at >>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >>>> at >>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:= 1548) >>>> at >>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:150= 9) >>>> at >>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java= :1432) >>>> at >>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >>>> at >>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >>>> at >>>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationU= til.java:300) >>>> at >>>> org.apache.flink.util.InstantiationUtil.writeObjectToConfig(Instantiat= ionUtil.java:252) >>>> at >>>> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(Task= Config.java:273) >>>> ... 19 more >>>> >>>> >>>> Please help me understand why I get this exception and how to fix it >>>> [rewrite code may be?] >>>> >>>> Thanks, >>>> Tarandeep >>>> >>>> >>>> >>> > --001a1144b95204b8960534dcb9a2 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
You can also make the KeySelector a static inner class. Th= at should work as well.

On Thu, Jun 9, 2016 at 7:00 PM, Tarandeep Singh <tarandeep@g= mail.com> wrote:
Thank you Aljoscha and Fabian for your replies.
=
@Aljoscha: when you said "cleaner is not used in CoGroup.where(). = I'm afraid this is a bug", I am assuming you are referring to Flin= k engine itself.

@Fabian: thanks for the optimization tip.

=
This is how I have got it working (with a hack): In my dataset, the j= oin field/key can be null otherwise .where(fieldName) works and I don't= get not-serializable exception. So I applied a MapFunction to DataSet and = put a dummy value in the join field/key where it was null. Then In the join= function, I change it back to null.

Best,
Tarandeep=

On Thu, Jun 9, 2016 at 7:06 AM, Aljoscha K= rettek <aljoscha@apache.org> wrote:
Hi,
the problem is that the KeySelector = is an anonymous inner class and as such as a reference to the outer RecordF= ilterer object. Normally, this would be rectified by the closure cleaner bu= t the cleaner is not used in CoGroup.where(). I'm afraid this is a bug.=

Best,
Aljoscha


On Thu, 9 Jun 2= 016 at 14:06 Fabian Hueske <fhueske@gmail.com> wrote:
Hi Tarandeep,

the exception suggests that Flink tries to ser= ialize RecordsFilterer as a user function (this happens via Java Serializat= ion).
I said suggests because the code that uses RecordsFilterer i= s not included.

To me it looks like RecordsFilterer should not= be used as a user function. It is a helper class to construct a DataSet pr= ogram, so it should not be shipped for execution.
You would use su= ch a class as follows:

DataSet<T> records =3D ...
DataSet<String> filterIDs =3D ...

RecordsFilterer rf = =3D new RecordsFilterer();
DataSet<Tuple2<Boolean, T>>= result =3D rf.addFilterFlag(records, filterIDs, "myField");
<= br>
Regarding the join code, I would suggest an optimization.
Instead of using CoGroup, I would use distinct and an OuterJoin like = this:

DataSet<String> distIds =3D filtereredIds.dis= tinct();
DataSet<Tuple2<Boolean, T> result =3D recor= ds
=C2=A0 .leftOuterJoin(distIds)
=C2=A0 .where(KEYSELECTOR)
=C2= =A0 .equalTo("*") // use full string as key
=C2=A0 = .with(JOINFUNC) // set Bool to false if right =3D=3D null, true otherwise
Best, Fabian
2016-06-09 2:28 GMT+02:00 Tarandeep Singh <t= arandeep@gmail.com>:
Hi,

I am getting NoSeriali= zableException in this class-

=EF=BB=BF
public class Reco=
rdsFilterer<T extends GenericRecord> {

public DataSet<Tuple2<Bool= ean,T>> addFilt= erFlag(DataSet<T>= dataset, DataSet<String>= ; filteredIds, String fieldNam= e) {
return dataset= .coGroup(filteredIds)
.where(new KeySelector<= T, String>() {
= @Override
public String getKey(T = t) throws Exception {
= String s =3D (String) t.get(fieldName);
return
s !=3D null ? s : UUID.randomUUID().toString();
= }
})
.equal= To((KeySelector<String, Str= ing>) s -> s)
.with(new CoGroupFunction<T, String, Tuple2<Boolean,T>&= gt;() {
@Overr= ide
public void coGroup(Iterable<T> records, = Iterable<String> ids, = Collector<Tuple2<Boolean,T>> c= ollector) throws Exception { boolean filterFlag =3D false;
<= span style=3D"color:rgb(204,120,50)"> for
(St= ring id : ids) {
filterFlag =3D true;
}

for
(T record : records) {
collect= or.collect(new Tuple2<>(= filterFlag, record));
}
}
= });

= }
}

What I am trying to do is write a generic = code that will join Avro records (of different types) with String records a= nd there is a match add a filter flag. This way I can use the same code for= different Avro record types. But I am getting this exception-

Excep= tion in thread "main" org.apache.flink.optimizer.CompilerExceptio= n: Error translating node 'Map "Key Extractor" : MAP [[ Globa= lProperties [partitioning=3DRANDOM_PARTITIONED] ]] [[ LocalProperties [orde= ring=3Dnull, grouped=3Dnull, unique=3Dnull] ]]': Could not write the us= er code wrapper class org.apache.flink.api.common.operators.util.UserCodeOb= jectWrapper : java.io.NotSerializableException: com.styleseat.dataplatform.= etl.jobs.boundary.RecordsFilterer
=C2=A0=C2=A0=C2=A0 at org.apache.flink= .optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:= 386)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.optimizer.plantranslate.JobG= raphGenerator.preVisit(JobGraphGenerator.java:109)
=C2=A0=C2=A0=C2=A0 at= org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlan= Node.java:198)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.optimizer.plan.Dua= lInputPlanNode.accept(DualInputPlanNode.java:163)
=C2=A0=C2=A0=C2=A0 at = org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanN= ode.java:199)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.optimizer.plan.Sing= leInputPlanNode.accept(SingleInputPlanNode.java:199)
=C2=A0=C2=A0=C2=A0 = at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:= 128)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.optimizer.plantranslate.JobG= raphGenerator.compileJobGraph(JobGraphGenerator.java:188)
=C2=A0=C2=A0= =C2=A0 at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.j= ava:187)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.api.java.LocalEnvironmen= t.execute(LocalEnvironment.java:90)
=C2=A0=C2=A0=C2=A0 at org.apache.fli= nk.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
= =C2=A0=C2=A0=C2=A0 at com.styleseat.dataplatform.etl.jobs.search.RunSearchL= ogProcessorV2.runSearchLogProcessor(RunSearchLogProcessorV2.java:57)
=C2= =A0=C2=A0=C2=A0 at com.styleseat.dataplatform.etl.jobs.search.RunSearchLogP= rocessorV2.main(RunSearchLogProcessorV2.java:32)
=C2=A0=C2=A0=C2=A0 at s= un.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
=C2=A0=C2=A0= =C2=A0 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorI= mpl.java:62)
=C2=A0=C2=A0=C2=A0 at sun.reflect.DelegatingMethodAccessorI= mpl.invoke(DelegatingMethodAccessorImpl.java:43)
=C2=A0=C2=A0=C2=A0 at j= ava.lang.reflect.Method.invoke(Method.java:497)
=C2=A0=C2=A0=C2=A0 at co= m.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Cause= d by: org.apache.flink.runtime.operators.util.CorruptConfigurationException= : Could not write the user code wrapper class org.apache.flink.api.common.o= perators.util.UserCodeObjectWrapper : java.io.NotSerializableException: Rec= ordsFilterer
=C2=A0=C2=A0=C2=A0 at org.apache.flink.runtime.operators.ut= il.TaskConfig.setStubWrapper(TaskConfig.java:275)
=C2=A0=C2=A0=C2=A0 at = org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInpu= tVertex(JobGraphGenerator.java:843)
=C2=A0=C2=A0=C2=A0 at org.apache.fli= nk.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.jav= a:331)
=C2=A0=C2=A0=C2=A0 ... 17 more
Caused by: java.io.NotSerializa= bleException: RecordsFilterer
=C2=A0=C2=A0=C2=A0 at java.io.ObjectOutput= Stream.writeObject0(ObjectOutputStream.java:1184)
=C2=A0=C2=A0=C2=A0 at = java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)=
=C2=A0=C2=A0=C2=A0 at java.io.ObjectOutputStream.writeSerialData(Object= OutputStream.java:1509)
=C2=A0=C2=A0=C2=A0 at java.io.ObjectOutputStream= .writeOrdinaryObject(ObjectOutputStream.java:1432)
=C2=A0=C2=A0=C2=A0 at= java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
= =C2=A0=C2=A0=C2=A0 at java.io.ObjectOutputStream.defaultWriteFields(ObjectO= utputStream.java:1548)
=C2=A0=C2=A0=C2=A0 at java.io.ObjectOutputStream.= writeSerialData(ObjectOutputStream.java:1509)
=C2=A0=C2=A0=C2=A0 at java= .io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)=C2=A0=C2=A0=C2=A0 at java.io.ObjectOutputStream.writeObject0(ObjectOutput= Stream.java:1178)
=C2=A0=C2=A0=C2=A0 at java.io.ObjectOutputStream.defau= ltWriteFields(ObjectOutputStream.java:1548)
=C2=A0=C2=A0=C2=A0 at java.i= o.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
=C2= =A0=C2=A0=C2=A0 at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOut= putStream.java:1432)
=C2=A0=C2=A0=C2=A0 at java.io.ObjectOutputStream.wr= iteObject0(ObjectOutputStream.java:1178)
=C2=A0=C2=A0=C2=A0 at java.io.O= bjectOutputStream.writeObject(ObjectOutputStream.java:348)
=C2=A0=C2=A0= =C2=A0 at org.apache.flink.util.InstantiationUtil.serializeObject(Instantia= tionUtil.java:300)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.util.Instantia= tionUtil.writeObjectToConfig(InstantiationUtil.java:252)
=C2=A0=C2=A0=C2= =A0 at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(Ta= skConfig.java:273)
=C2=A0=C2=A0=C2=A0 ... 19 more


Pleas= e help me understand why I get this exception and how to fix it [rewrite co= de may be?]

Thanks,
Tarandeep
=





--001a1144b95204b8960534dcb9a2--