flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alexandru Vasiu <alexandru.ava...@gmail.com>
Subject Re: POJO ERROR
Date Fri, 20 Dec 2019 07:24:09 GMT
Hi,

We fixed it by converting the case class to a class.

Thank you,
Alex

On Thu, Dec 19, 2019 at 5:43 PM Timo Walther <twalthr@apache.org> wrote:

> Sorry, you are right. Maybe you can also share the full stack trace
> because I don't know where this guava library should be used.
>
> Regards,
> Timo
>
>
> On 19.12.19 14:50, Alexandru Vasiu wrote:
> > Nope, because scalaBuildVersion is the scala version including minor
> > version so in this case: 2.12.10 and we used it just where we need.
> > We used scalaVersion to specify for each library what scala is used, so
> > used flink will be flink-streaming-scala_2.12
> >
> > Alex
> >
> > On Thu, Dec 19, 2019 at 3:40 PM Timo Walther <twalthr@apache.org
> > <mailto:twalthr@apache.org>> wrote:
> >
> >     I see a mismatch between scalaBuildVersion and scalaVersion could
> this
> >     be the issue?
> >
> >     Regards,
> >     Timo
> >
> >
> >     On 19.12.19 14:33, Alexandru Vasiu wrote:
> >      > This is a part of my Gradle config:
> >      >
> >      > ext {
> >      >      scalaVersion = '2.12'
> >      >      flinkVersion = '1.9.1'
> >      >      scalaBuildVersion = "${scalaVersion}.10"
> >      >      scalaMockVersion = '4.4.0'
> >      >      circeGenericVersion = '0.12.3'
> >      >      circeExtrasVersion = '0.12.2'
> >      >      pardiseVersion = '2.1.1'
> >      >      slf4jVersion = '1.7.7'
> >      >      log4jVersion = '1.2.17'
> >      >      sourceDir = 'src/main/scala'
> >      >      testDir = 'src/test/scala'
> >      > }
> >      > repositories {
> >      >      mavenCentral()
> >      >      //maven { url
> >      > "https://repository.apache.org/content/repositories/snapshots/" }
> >      > }
> >      > configurations {
> >      >      scalaCompilerPlugin
> >      > }
> >      > dependencies {
> >      >      implementation
> >     "org.scala-lang:scala-library:${scalaBuildVersion}"
> >      >      //
> >     --------------------------------------------------------------
> >      >      // Compile-time dependencies that should NOT be part of the
> >      >      // shadow jar and are provided in the lib folder of Flink
> >      >      //
> >     --------------------------------------------------------------
> >      >      //compile "org.apache.flink:flink-java:${flinkVersion}"
> >      >      implementation
> >      >
> >
>  "org.apache.flink:flink-streaming-scala_${scalaVersion}:${flinkVersion}"
> >      >      implementation
> >      >
> >
>  "org.apache.flink:flink-connector-kafka_${scalaVersion}:${flinkVersion}"
> >      >      //
> >     --------------------------------------------------------------
> >      >      // Dependencies that should be part of the shadow jar, e.g.
> >      >      // connectors. These must be in the flinkShadowJar
> >     configuration!
> >      >      //
> >     --------------------------------------------------------------
> >      >      //flinkShadowJar
> >      >
> >
>  "org.apache.flink:flink-connector-kafka-0.11_${scalaBinaryVersion}:${flinkVersion}"
> >      >      // https://mvnrepository.com/artifact/io.circe/
> >      >      implementation
> >      > "io.circe:circe-generic_${scalaVersion}:${circeGenericVersion}"
> >      >      implementation
> >      >
> "io.circe:circe-generic-extras_${scalaVersion}:${circeExtrasVersion}"
> >      >      implementation
> >      > "io.circe:circe-parser_${scalaVersion}:${circeGenericVersion}"
> >      >      //
> https://mvnrepository.com/artifact/org.scalamacros/paradise
> >      >      scalaCompilerPlugin
> >      > "org.scalamacros:paradise_${scalaBuildVersion}:${pardiseVersion}"
> >      >      implementation "log4j:log4j:${log4jVersion}"
> >      >      implementation "org.slf4j:slf4j-log4j12:${slf4jVersion}"
> >      >      // Add test dependencies here.
> >      >      //testImplementation "junit:junit:4.12"
> >      >      testImplementation
> >     "org.scalatest:scalatest_${scalaVersion}:3.1.0"
> >      >      //
> https://mvnrepository.com/artifact/org.scalamock/scalamock
> >      >      testImplementation
> >      > "org.scalamock:scalamock_${scalaVersion}:${scalaMockVersion}"
> >      > }
> >      >
> >      > So all are with the same scala version. I cannot share the code,
> >     but the
> >      > main app looks like:
> >      >
> >      > val env = StreamExecutionEnvironment.getExecutionEnvironment
> >      > val stream  = env
> >      >          .addSource(KAFKA_STREAM) // this will get us a stream
> >     with our
> >      > object model which is like this: case class A(a:Map[String,
> >      > other_case_class_obj], b: List[other_case_class_obj], c: String)
> >      > .flatMap(CustomFlatMap())
> >      > .print
> >      >
> >      > Thank you,
> >      > Alex
> >      >
> >      > On Thu, Dec 19, 2019 at 3:14 PM Timo Walther <twalthr@apache.org
> >     <mailto:twalthr@apache.org>
> >      > <mailto:twalthr@apache.org <mailto:twalthr@apache.org>>>
wrote:
> >      >
> >      >     That's sounds like a classloading or most likely dependency
> >     issue.
> >      >
> >      >     Are all dependencies including Flink use the same Scala
> >     version? Could
> >      >     you maybe share reproducible some code with us?
> >      >
> >      >     Regards,
> >      >     Timo
> >      >
> >      >
> >      >     On 19.12.19 13:53, Alexandru Vasiu wrote:
> >      >      > I'm sorry for my last message, it might be incomplete.
> >      >      >
> >      >      > So I used case classed for my objects, but it doesn't work.
> >      >      >
> >      >      > Riching this error: "Exception in thread "main"
> >      >      >
> >      >
> >
>  org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ExecutionError:
> >      >
> >      >      > java.lang.NoClassDefFoundError:
> >     scala/math/Ordering$$anon$9" when
> >      >     I'm
> >      >      > trying to apply the map/flatMap function over the stream
> >     (which
> >      >     is from
> >      >      > a Kafka consumer).
> >      >      >
> >      >      >
> >      >      > Alex
> >      >      >
> >      >      > On Thu, Dec 19, 2019 at 2:24 PM Alexandru Vasiu
> >      >      > <alexandru.avasiu@gmail.com
> >     <mailto:alexandru.avasiu@gmail.com>
> >     <mailto:alexandru.avasiu@gmail.com <mailto:
> alexandru.avasiu@gmail.com>>
> >      >     <mailto:alexandru.avasiu@gmail.com
> >     <mailto:alexandru.avasiu@gmail.com>
> >      >     <mailto:alexandru.avasiu@gmail.com
> >     <mailto:alexandru.avasiu@gmail.com>>>> wrote:
> >      >      >
> >      >      >     I used `case class` for example case class A(a:
> >     Map[String,
> >      >     String])
> >      >      >     so it should work
> >      >      >
> >      >      >     Alex
> >      >      >
> >      >      >     On Thu, Dec 19, 2019 at 2:18 PM Timo Walther
> >      >     <twalthr@apache.org <mailto:twalthr@apache.org>
> >     <mailto:twalthr@apache.org <mailto:twalthr@apache.org>>
> >      >      >     <mailto:twalthr@apache.org <mailto:twalthr@apache.org>
> >     <mailto:twalthr@apache.org <mailto:twalthr@apache.org>>>>
wrote:
> >      >      >
> >      >      >         Hi Alex,
> >      >      >
> >      >      >         the problem is that `case class` classes are
> >     analyzed by
> >      >     Scala
> >      >      >         specific
> >      >      >         code whereas `class` classes are analyzed with
> >     Java specific
> >      >      >         code. So I
> >      >      >         would recommend to use a case class to make sure
> >     you stay
> >      >     in the
> >      >      >         "Scala
> >      >      >         world" otherwise the fallback is the Java-based
> >      >     TypeExtractor.
> >      >      >
> >      >      >         For your custom Map, you can simply ignore this
> error
> >      >     message.
> >      >      >         It will
> >      >      >         fallback to the Java-based TypeExtractor and treat
> >     it as a
> >      >      >         generic type
> >      >      >         because it is not a POJO.
> >      >      >
> >      >      >         I hope this helps.
> >      >      >
> >      >      >         Regards,
> >      >      >         Timo
> >      >      >
> >      >      >
> >      >      >         On 19.12.19 12:41, Alexandru Vasiu wrote:
> >      >      >          > Hi,
> >      >      >          >
> >      >      >          > I use flink-scala version 1.9.1 and scala
> >     2.12.10, and I
> >      >      >         defined a data
> >      >      >          > type which is a bit more complex: it has a list
> >     in it
> >      >     and even a
> >      >      >          > dictionary. When I try to use a custom map I
> >     got this
> >      >     error:
> >      >      >          >
> >      >      >          > INFO
> >       org.apache.flink.api.java.typeutils.TypeExtractor -
> >      >      >         class A  does
> >      >      >          > not contain a setter for field fields
> >      >      >          > INFO
> >       org.apache.flink.api.java.typeutils.TypeExtractor -
> >      >      >         class A cannot
> >      >      >          > be used as a POJO type because not all fields
> are
> >      >     valid POJO
> >      >      >         fields, and
> >      >      >          > must be processed as GenericType. Please read
> >     the Flink
> >      >      >         documentation on
> >      >      >          > "Data Types & Serialization" for details of
the
> >     effect on
> >      >      >         performance.
> >      >      >          >
> >      >      >          > Is there a fix for this? Or a workaround?
> >      >      >          >
> >      >      >          > Thank you,
> >      >      >          > Alex
> >      >      >
> >      >
> >
>
>

Mime
View raw message