flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kostas Kloudas <k.klou...@data-artisans.com>
Subject Re: serialization error when using multiple metrics counters
Date Mon, 09 Oct 2017 14:37:01 GMT
Hi Colin,

Are you initializing your counters from within the open() method of you rich function?
In other words, are you calling 

counter = getRuntimeContext.getMetricGroup.counter(“my counter”) 

from within the open().

The counter interface is not serializable. So if you instantiate the counters outside the
open(),
when Flink tries to ship your code to the cluster, it cannot so you get the exception.

You can have a look at the docs for an example:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html <https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html>

Thanks,
Kostas

> On Oct 7, 2017, at 11:34 PM, Colin Williams <colin.williams.seattle@gmail.com>
wrote:
> 
> I've created a RichMapFunction in scala with multiple counters like:
> 
>    lazy val successCounter = getRuntimeContext.getMetricGroup.counter("successfulParse")
>    lazy val failedCounter = getRuntimeContext.getMetricGroup.counter("failedParse")
>    lazy val errorCounter = getRuntimeContext.getMetricGroup.counter("errorParse")
> 
> which I increment in the map function. While testing I noticed that I have no issues
with using a single counter. However with multiple counters I get a serialization error using
more than one counter.
> 
> Does anyone know how I can use multiple counters from my RichMapFunction, or what I'm
doing wrong?
> 
> [info]   org.apache.flink.api.common.InvalidProgramException: The implementation of the
RichMapFunction is not serializable. The object probably contains or references non serializable
fields.
> [info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
> [info]   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
> [info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
> [info]   at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:527)
> [info]   at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:581)
> [info]   at ParsedResultUnwrapperTest$$anonfun$2.apply(ParsedResultUnwrapperTest.scala:27)
> [info]   at ParsedResultUnwrapperTest$$anonfun$2.apply(ParsedResultUnwrapperTest.scala:23)
> [info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
> [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
> [info]   ...
> [info]   Cause: java.io <http://java.io/>.NotSerializableException: org.apache.flink.metrics.SimpleCounter
> [info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> [info]   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> [info]   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> [info]   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> [info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> [info]   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> [info]   at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315)
> [info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
> [info]   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
> [info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
> [info]   ...
> [info] - ParseResultUnwrapper.errorCounter.getCount should return 1L for a Error ->
ParseResult[LineProtocol] *** FAILED ***
> [info]   org.apache.flink.api.common.InvalidProgramException: The implementation of the
RichMapFunction is not serializable. The object probably contains or references non serializable
fields.
> [info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
> [info]   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
> [info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
> [info]   at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:527)
> [info]   at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:581)
> [info]   at ParsedResultUnwrapperTest$$anonfun$3.apply(ParsedResultUnwrapperTest.scala:37)
> [info]   at ParsedResultUnwrapperTest$$anonfun$3.apply(ParsedResultUnwrapperTest.scala:32)
> [info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
> [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
> [info]   ...
> [info]   Cause: java.io <http://java.io/>.NotSerializableException: org.apache.flink.metrics.SimpleCounter
> [info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> [info]   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> [info]   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> [info]   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> [info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> [info]   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> [info]   at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315)
> [info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
> [info]   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
> [info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
> [info]   ...


Mime
View raw message