Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C1A1B200D1A for ; Mon, 9 Oct 2017 16:37:10 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id BFF411609CE; Mon, 9 Oct 2017 14:37:10 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8802F1609BB for ; Mon, 9 Oct 2017 16:37:09 +0200 (CEST) Received: (qmail 41992 invoked by uid 500); 9 Oct 2017 14:37:08 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 41980 invoked by uid 99); 9 Oct 2017 14:37:08 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 09 Oct 2017 14:37:08 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 9DE631A22F9 for ; Mon, 9 Oct 2017 14:37:07 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.98 X-Spam-Level: * X-Spam-Status: No, score=1.98 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, KAM_SHORT=0.001, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=data-artisans-com.20150623.gappssmtp.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id 4LVbHQiU4ddv for ; Mon, 9 Oct 2017 14:37:05 +0000 (UTC) Received: from mail-wm0-f44.google.com (mail-wm0-f44.google.com [74.125.82.44]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id DE2A95F298 for ; Mon, 9 Oct 2017 14:37:04 +0000 (UTC) Received: by mail-wm0-f44.google.com with SMTP id k4so24109645wmc.1 for ; Mon, 09 Oct 2017 07:37:04 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=data-artisans-com.20150623.gappssmtp.com; s=20150623; h=from:message-id:mime-version:subject:date:in-reply-to:cc:to :references; bh=lLX1TquaRaNNvlixJgzpXDYEnWhk86DAFTxd04fO6TQ=; b=ccysZTMgpDbpPP8FVf+4Cv4YOPXJd8E3v/9TAXuDP8dL3mxT0oT1BQeI3qlSVep3Xu y+jV69qFMZlqpD1UiCxkDZKUQ+QBeRpTjEqKDfvv92bvVWAwG8GPYOggl1RscYlxutQ1 JfPXPgxz7LdrBd/k4ZGCk2qzv3OIIhUYNsHi4PDkQ1PE2BKy9EEzTNjhgxCNP+5h+wFH sRGaD2mt8Chvr09mXUIp//mVTW5VWQDeOUOmxwrNU/NOSVkEoI8qF+4OoQjKkfOcCb8S tVUdPp1rvBI+U6YTy+4kDjJZ/zuFpylfVokNOz7TGk5tyFPqmAXjBroUbpO1WUnr23Cw kYvw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:message-id:mime-version:subject:date :in-reply-to:cc:to:references; bh=lLX1TquaRaNNvlixJgzpXDYEnWhk86DAFTxd04fO6TQ=; b=bMqF9CrcZxQuaqrcaAAlDaZ30++XTgbhy7sDl7BQBrVRWpMhpEyPodaPE/D0tuIlAC KqxEps8Lg0HNmTXtQ26wvmzRYE/SF9eSHHeFO3z83XSy96DRDVKNxHJxBMAsk74ZT7hu Bn9/jTD/mY7P9Juhj+uIjSbhjp7sTskojZtS9Lc2W9r5H+pW7OkfOEXMb3sa8fJMvWZj VKwE93unvzfYDyJsGtGJelqtOAHiEQZx01VRXI/NSb9F4qlJ0TKnWoKLf5NsiXyyD7Nf KmT3UzFcDqWC1LBF6vKhnB9Fv3VVx1pbHCisAj6lPR05QfLkAb8tvPSeh2dNU3nqLrkg L86A== X-Gm-Message-State: AMCzsaWRPKtHsWxnovjnt1iK9fdpreroOFiVnASKcGKu/PMvaG/4b8/I W7NXF8766r/EG4DgSBmtt+rO2g== X-Google-Smtp-Source: AOwi7QCBzsh+OobK82VLn7/GiwiJ/BF99Tb2YqgbfsYepAEZNV4XOhZCcUmzD9QOw8zFDGfwLx0DpQ== X-Received: by 10.80.209.215 with SMTP id i23mr13878571edg.102.1507559823330; Mon, 09 Oct 2017 07:37:03 -0700 (PDT) Received: from [192.168.178.65] (ip-2-205-80-51.web.vodafone.de. [2.205.80.51]) by smtp.gmail.com with ESMTPSA id e2sm5023290edi.65.2017.10.09.07.37.01 (version=TLS1_2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Mon, 09 Oct 2017 07:37:02 -0700 (PDT) From: Kostas Kloudas Message-Id: Content-Type: multipart/alternative; boundary="Apple-Mail=_92237E5E-E649-4923-8B28-080098DEE797" Mime-Version: 1.0 (Mac OS X Mail 10.3 \(3273\)) Subject: Re: serialization error when using multiple metrics counters Date: Mon, 9 Oct 2017 16:37:01 +0200 In-Reply-To: Cc: user@flink.apache.org To: Colin Williams References: X-Mailer: Apple Mail (2.3273) archived-at: Mon, 09 Oct 2017 14:37:10 -0000 --Apple-Mail=_92237E5E-E649-4923-8B28-080098DEE797 Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=utf-8 Hi Colin, Are you initializing your counters from within the open() method of you = rich function? In other words, are you calling=20 counter =3D getRuntimeContext.getMetricGroup.counter(=E2=80=9Cmy = counter=E2=80=9D)=20 from within the open(). The counter interface is not serializable. So if you instantiate the = counters outside the open(), when Flink tries to ship your code to the cluster, it cannot so you get = the exception. You can have a look at the docs for an example: = https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/met= rics.html = Thanks, Kostas > On Oct 7, 2017, at 11:34 PM, Colin Williams = wrote: >=20 > I've created a RichMapFunction in scala with multiple counters like: >=20 > lazy val successCounter =3D = getRuntimeContext.getMetricGroup.counter("successfulParse") > lazy val failedCounter =3D = getRuntimeContext.getMetricGroup.counter("failedParse") > lazy val errorCounter =3D = getRuntimeContext.getMetricGroup.counter("errorParse") >=20 > which I increment in the map function. While testing I noticed that I = have no issues with using a single counter. However with multiple = counters I get a serialization error using more than one counter. >=20 > Does anyone know how I can use multiple counters from my = RichMapFunction, or what I'm doing wrong? >=20 > [info] org.apache.flink.api.common.InvalidProgramException: The = implementation of the RichMapFunction is not serializable. The object = probably contains or references non serializable fields. > [info] at = org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100) > [info] at = org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clea= n(StreamExecutionEnvironment.java:1548) > [info] at = org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java= :183) > [info] at = org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:5= 27) > [info] at = org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:581) > [info] at = ParsedResultUnwrapperTest$$anonfun$2.apply(ParsedResultUnwrapperTest.scala= :27) > [info] at = ParsedResultUnwrapperTest$$anonfun$2.apply(ParsedResultUnwrapperTest.scala= :23) > [info] at = org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) > [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) > [info] at org.scalatest.Transformer.apply(Transformer.scala:22) > [info] ... > [info] Cause: java.io .NotSerializableException: = org.apache.flink.metrics.SimpleCounter > [info] at = java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) > [info] at = java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548= ) > [info] at = java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > [info] at = java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:143= 2) > [info] at = java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > [info] at = java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > [info] at = org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.= java:315) > [info] at = org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81) > [info] at = org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clea= n(StreamExecutionEnvironment.java:1548) > [info] at = org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java= :183) > [info] ... > [info] - ParseResultUnwrapper.errorCounter.getCount should return 1L = for a Error -> ParseResult[LineProtocol] *** FAILED *** > [info] org.apache.flink.api.common.InvalidProgramException: The = implementation of the RichMapFunction is not serializable. The object = probably contains or references non serializable fields. > [info] at = org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100) > [info] at = org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clea= n(StreamExecutionEnvironment.java:1548) > [info] at = org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java= :183) > [info] at = org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:5= 27) > [info] at = org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:581) > [info] at = ParsedResultUnwrapperTest$$anonfun$3.apply(ParsedResultUnwrapperTest.scala= :37) > [info] at = ParsedResultUnwrapperTest$$anonfun$3.apply(ParsedResultUnwrapperTest.scala= :32) > [info] at = org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) > [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) > [info] at org.scalatest.Transformer.apply(Transformer.scala:22) > [info] ... > [info] Cause: java.io .NotSerializableException: = org.apache.flink.metrics.SimpleCounter > [info] at = java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) > [info] at = java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548= ) > [info] at = java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > [info] at = java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:143= 2) > [info] at = java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > [info] at = java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > [info] at = org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.= java:315) > [info] at = org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81) > [info] at = org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clea= n(StreamExecutionEnvironment.java:1548) > [info] at = org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java= :183) > [info] ... --Apple-Mail=_92237E5E-E649-4923-8B28-080098DEE797 Content-Transfer-Encoding: quoted-printable Content-Type: text/html; charset=utf-8 Hi Colin,

Are you initializing your counters from within the open() = method of you rich function?
In other words, are = you calling 

counter =3D getRuntimeContext.getMetricGroup.counter(=E2=80=9Cmy counter=E2=80=9D

from within the = open().

The = counter interface is not serializable. So if you instantiate the = counters outside the open(),
when Flink tries to = ship your code to the cluster, it cannot so you get the = exception.

You = can have a look at the docs for an example:

Thanks,
Kostas

On Oct 7, 2017, at 11:34 PM, Colin Williams = <colin.williams.seattle@gmail.com> wrote:

I've created = a RichMapFunction in scala with multiple counters like:

   lazy = val successCounter =3D getRuntimeContext.getMetricGroup.counter("successfulParse")
   lazy val failedCounter =3D = getRuntimeContext.getMetricGroup.counter("failedParse")
   lazy val errorCounter =3D getRuntimeContext.getMetricGroup.counter("errorParse")

which I increment in the map = function. While testing I noticed that I have no issues with using a = single counter. However with multiple counters I get a serialization = error using more than one counter.

Does anyone know how I can use multiple counters from my = RichMapFunction, or what I'm doing wrong?

[info]   = org.apache.flink.api.common.InvalidProgramException: The = implementation of the RichMapFunction is not serializable. The object = probably contains or references non serializable fields.
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
[info]   = at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
[info]   = at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:527)
[info]   at = org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:581)
[info]   at = ParsedResultUnwrapperTest$$anonfun$2.apply(ParsedResultUnwrapperTest.scala:27)
[info]   at ParsedResultUnwrapperTest$$anonfun$2.apply(ParsedResultUnwrapperTest.scala:23)
[info]   at = org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info] =   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info] =   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info] =   ...
[info]   Cause: java.io.NotSerializableException: org.apache.flink.metrics.SimpleCounter
[info]   at = java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
[info] =   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
[info] =   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
[info] =   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
[info] =   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
[info] =   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
[info] =   at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315)
[info] =   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
[info]   at = org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
[info]   = ...
[info] - ParseResultUnwrapper.errorCounter.getCount should return 1L for a Error -> = ParseResult[LineProtocol] *** FAILED ***
[info] =   org.apache.flink.api.common.InvalidProgramException: The implementation of the = RichMapFunction is not serializable. The object probably contains or = references non serializable fields.
[info]   = at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
[info]   = at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
[info]   = at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:527)
[info]   at = org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:581)
[info]   at = ParsedResultUnwrapperTest$$anonfun$3.apply(ParsedResultUnwrapperTest.scala:37)
[info]   at ParsedResultUnwrapperTest$$anonfun$3.apply(ParsedResultUnwrapperTest.scala:32)
[info]   at = org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info] =   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info] =   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info] =   ...
[info]   Cause: java.io.NotSerializableException: org.apache.flink.metrics.SimpleCounter
[info]   at = java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
[info] =   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
[info] =   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
[info] =   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
[info] =   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
[info] =   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
[info] =   at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315)
[info] =   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
[info]   at = org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
[info]   = ...

= --Apple-Mail=_92237E5E-E649-4923-8B28-080098DEE797--