flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Seth Wiesman <swies...@mediamath.com>
Subject Re: serialization error when using multiple metrics counters
Date Mon, 09 Oct 2017 20:34:47 GMT
A scala class contains a single lazy val it is implemented using a boolean flag to track if
the field has been evaluated. When a class contains, multiple lazy val’s it is implemented
as a bit mask shared amongst the variables. This can lead to inconsistencies as to whether
serialization forces evaluation of the field, in general lazy val’s should always be marked
@transient for expected behavior.

Seth

From: Stephan Ewen <sewen@apache.org>
Date: Monday, October 9, 2017 at 2:44 PM
To: Kostas Kloudas <k.kloudas@data-artisans.com>
Cc: Colin Williams <colin.williams.seattle@gmail.com>, user <user@flink.apache.org>
Subject: Re: serialization error when using multiple metrics counters

Interesting, is there a quirk in Scala that using multiple lazy variables results possibly
in eager initialization of some?

On Mon, Oct 9, 2017 at 4:37 PM, Kostas Kloudas <k.kloudas@data-artisans.com<mailto:k.kloudas@data-artisans.com>>
wrote:
Hi Colin,

Are you initializing your counters from within the open() method of you rich function?
In other words, are you calling

counter = getRuntimeContext.getMetricGroup.counter(“my counter”)

from within the open().

The counter interface is not serializable. So if you instantiate the counters outside the
open(),
when Flink tries to ship your code to the cluster, it cannot so you get the exception.

You can have a look at the docs for an example:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html

Thanks,
Kostas

On Oct 7, 2017, at 11:34 PM, Colin Williams <colin.williams.seattle@gmail.com<mailto:colin.williams.seattle@gmail.com>>
wrote:

I've created a RichMapFunction in scala with multiple counters like:

   lazy val successCounter = getRuntimeContext.getMetricGroup.counter("successfulParse")
   lazy val failedCounter = getRuntimeContext.getMetricGroup.counter("failedParse")
   lazy val errorCounter = getRuntimeContext.getMetricGroup.counter("errorParse")

which I increment in the map function. While testing I noticed that I have no issues with
using a single counter. However with multiple counters I get a serialization error using more
than one counter.

Does anyone know how I can use multiple counters from my RichMapFunction, or what I'm doing
wrong?

[info]   org.apache.flink.api.common.In<http://org.apache.flink.api.common.In>validProgramException:
The implementation of the RichMapFunction is not serializable. The object probably contains
or references non serializable fields.
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
[info]   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:527)
[info]   at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:581)
[info]   at ParsedResultUnwrapperTest$$anonfun$2.apply(ParsedResultUnwrapperTest.scala:27)
[info]   at ParsedResultUnwrapperTest$$anonfun$2.apply(ParsedResultUnwrapperTest.scala:23)
[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   ...
[info]   Cause: java.io<http://java.io/>.NotSerializableException: org.apache.flink.metrics.SimpleCounter
[info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
[info]   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
[info]   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
[info]   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
[info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
[info]   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
[info]   at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315)
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
[info]   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
[info]   ...
[info] - ParseResultUnwrapper.errorCounter.getCount should return 1L for a Error -> ParseResult[LineProtocol]
*** FAILED ***
[info]   org.apache.flink.api.common.In<http://org.apache.flink.api.common.In>validProgramException:
The implementation of the RichMapFunction is not serializable. The object probably contains
or references non serializable fields.
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
[info]   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:527)
[info]   at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:581)
[info]   at ParsedResultUnwrapperTest$$anonfun$3.apply(ParsedResultUnwrapperTest.scala:37)
[info]   at ParsedResultUnwrapperTest$$anonfun$3.apply(ParsedResultUnwrapperTest.scala:32)
[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   ...
[info]   Cause: java.io<http://java.io/>.NotSerializableException: org.apache.flink.metrics.SimpleCounter
[info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
[info]   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
[info]   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
[info]   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
[info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
[info]   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
[info]   at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315)
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
[info]   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
[info]   ...


Mime
View raw message