flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aawhitaker <aawhita...@gmail.com>
Subject Re: Flink+avro integration
Date Wed, 21 Oct 2015 13:46:44 GMT
Till Rohrmann wrote
> What was your problem with using Java POJOs with the Scala API? 

Here's a quick example
<https://gist.github.com/AndrewWhitaker/e51308bb4b43f7ddefc3>   that
demonstrates some of the problems I'm having. I used `max` in the example,
but actually I get an exception for most of the operations I try directly on
Java POJOs. 

The "User" class referenced here is just the Avro example schema hydrated
into a Java POJO. I can post that or the entire project if it'd be helpful.

I included the stack trace of the exception in the gist, but I'll post it
here too:

Exception in thread "main" java.lang.UnsupportedOperationException:
Specifying fields by name is onlysupported on Case Classes (for now).
	at org.apache.flink.api.scala.package$.fieldNames2Indices(package.scala:62)
	at org.apache.flink.api.scala.DataSet.aggregate(DataSet.scala:466)
	at org.apache.flink.api.scala.DataSet.max(DataSet.scala:503)
	at SampleAvroJob$.main(SampleAvroJob.scala:12)
	at SampleAvroJob.main(SampleAvroJob.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)

And if I use field position instead of field name, I get this exception:

Exception in thread "main"
org.apache.flink.api.common.InvalidProgramException: Aggregating on field
positions is only possible on tuple data types.
	at
org.apache.flink.api.scala.operators.ScalaAggregateOperator.<init>(ScalaAggregateOperator.java:71)
	at org.apache.flink.api.scala.DataSet.aggregate(DataSet.scala:455)
	at org.apache.flink.api.scala.DataSet.max(DataSet.scala:482)
	at SampleAvroJob$.main(SampleAvroJob.scala:12)
	at SampleAvroJob.main(SampleAvroJob.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)

A workaround is to use `.map` to map to tuples first, but this seems a
little clunky.

Thanks!



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-avro-integration-tp3162p3202.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Mime
View raw message