flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Serialisation problem
Date Mon, 21 Dec 2015 09:56:26 GMT
Hi,

In your program, you apply a distinct transformation on a data set that has
a (nested) GValue[] type. Distinct requires that all fields are comparable
with each other. Therefore all fields of the data sets' type must be valid
key types.
However, Flink does not support object arrays as keys types. This is
regardless of the type of the object and includes GValue[] whether it
implements Comparable or not. Object array types can't simply be used as
keys right now :-(

I see two ways to add this functionality. In both cases you need to get a
bit into the details of Flink's type system, serialization, and comparators.

1) You implement your own type information for GValue[] arrays, plus
serializer and comparator and manually inject this information.

2) You extend ObjectArrayTypeInfo, such that it supports key operations, if
the component type supports key operations. This would require to implement
a TypeComparator and some modifications to ObjectArrayTypeInfo. This change
would also be a valuable contribution to Flink.

I recommend to have a look at Flink's other type infos, serializers, and
comparators to learn how this is done in Flink.

Best,
Fabian




2015-12-20 22:01 GMT+01:00 Abdulrahman kaitoua <
abdulrahman.kaitoua@outlook.com>:

>
> I still have the same problem even when i extended GValue with comparable.
> I think that the problem might be in the fact that Array[GValue] are not
> compatible and not the GValues but i do not know how to fix it in flink,
> may be some implicit ordering would work (and why the this field,
> Array[GValue], is compared). I appreciate every and any help.
>
> I still do not understand why this was not problem in previous versions.
>
> Exception in thread "main"
> org.apache.flink.api.common.InvalidProgramException: This type
> (ObjectArrayTypeInfo<GenericType<it.polimi.genomics.core.DataTypes.GValue>>)
> cannot be used as key.
> at
> org.apache.flink.api.java.operators.Keys$ExpressionKeys.<init>(Keys.java:308)
> at
> org.apache.flink.api.java.operators.DistinctOperator.<init>(DistinctOperator.java:56)
> at org.apache.flink.api.scala.DataSet.distinct(DataSet.scala:740)
> at
> it.polimi.genomics.flink.FlinkImplementation.operator.region.GenometricMap4$.execute(GenometricMap4.scala:71)
>
> sealed trait GValue extends Serializable with Comparable[GValue] with Ordered[GValue]{
>   def compare(o : GValue) : Int = {
>     o match {
>       case GDouble(v) => this.asInstanceOf[GDouble].v compare v
>       case GString(v) => this.asInstanceOf[GString].v compare v
>       case GInt(v) => this.asInstanceOf[GInt].v compare v
>       case *_* => 0
>     }
>   }
>   def equal(o : GValue) : Boolean = {
>     o match {
>       case GInt(value) => try{value.equals(o.asInstanceOf[GInt].v)} catch { case e
: Throwable => false }
>       case GDouble(value) => try{value.equals(o.asInstanceOf[GDouble].v)} catch {
case e : Throwable => false }
>       case GString(value) => try{value.equals(o.asInstanceOf[GString].v)} catch {
case e : Throwable => false }
>       case GNull() => o.isInstanceOf[GNull]
>       case _ => false
>     }
>   }
> override def compareTo(o: GValue): Int = {
>   o match {
>     case GInt(value) => try{value.compareTo(o.asInstanceOf[GInt].v)} catch { case
e : Throwable => 0 }
>     case GDouble(value) => try{value.compareTo(o.asInstanceOf[GDouble].v)} catch {
case e : Throwable => 0 }
>     case GString(value) => try{value.compareTo(o.asInstanceOf[GString].v)} catch {
case e : Throwable => 0 }
>     case GNull() => 0
>     case _ => 0
>   }
> }
> }
>
>
>
>
>
> *-----------------------------------------------------------------Abdulrahman
> Kaitoua-----------------------------------------------------------------Ph.D.
> Candidate at Politecnico Di Milano*
>
>
>
> > Subject: Re: Serialisation problem
> > From: aljoscha@apache.org
> > Date: Mon, 14 Dec 2015 10:42:22 +0100
> > To: user@flink.apache.org
>
> >
> > Hi,
> > the problem could be that GValue is not Comparable. Could you try making
> it extend Comparable (The Java Comparable).
> >
> > Cheers,
> > Aljoscha
> > > On 12 Dec 2015, at 20:43, Robert Metzger <rmetzger@apache.org> wrote:
> > >
> > > Hi,
> > >
> > > Can you check the log output in your IDE or the log files of the Flink
> client (./bin/flink). The TypeExtractor is logging why a POJO is not
> recognized as a POJO.
> > >
> > > The log statements look like this:
> > >
> > > 20:42:43,465 INFO org.apache.flink.api.java.typeutils.TypeExtractor -
> class com.dataartisans.debug.MyPojo must have a default constructor to be
> used as a POJO.
> > >
> > >
> > >
> > > On Thu, Dec 10, 2015 at 11:24 PM, Abdulrahman kaitoua <
> abdulrahman.kaitoua@outlook.com> wrote:
> > >
> > >
> > > Hello,
> > >
> > > I would like to hive directions to make my code work again (thanks in
> advance). My code used to work on versions equal or less than 9.1 but when
> i included 10 or 10.1 i got the following exception.
> > >
> > > This type
> (ObjectArrayTypeInfo<GenericType<it.polimi.genomics.core.DataTypes.GValue>>)
> cannot be used as key
> > >
> > > I understood that it is related to the serialisation of objects. I
> tried to follow the POJO building directions in
> https://cwiki.apache.org/confluence/display/FLINK/Type+System,+Type+Extraction,+Serialization
> > > with no luck to make it work.
> > >
> > > my dataset contains a set of tuples as key and one array of GValues,
> this is a snapshot of the GValue class.
> > >
> > >
> > > sealed trait GValue extends Serializable with Ordered[GValue]{
> > > def compare(o : GValue) : Int = {
> > > o match {
> > > case GDouble(v) => this.asInstanceOf[GDouble].v compare v
> > > case GString(v) => this.asInstanceOf[GString].v compare v
> > > case GInt(v) => this.asInstanceOf[GInt].v compare v
> > > case GNull() => 0
> > > }
> > > }
> > > def equal(o : GValue) : Boolean = {
> > > o match {
> > > case GInt(value) => try{value.equals(o.asInstanceOf[GInt].v)} catch {
> case e : Throwable => false }
> > > case GDouble(value) => try{value.equals(o.asInstanceOf[GDouble].v)}
> catch { case e : Throwable => false }
> > > case GString(value) => try{value.equals(o.asInstanceOf[GString].v)}
> catch { case e : Throwable => false }
> > > case GNull() => o.isInstanceOf[GNull]
> > > case _ => false
> > > }
> > > }
> > > }
> > >
> > > /**
> > > * Represents a @GValue that contains an integer
> > > * @deprecated
> > > * @param v
> > > */
> > > case class GInt(v: Int) extends GValue{
> > > def this() = this(0)
> > > override def toString() : String = {
> > > v.toString
> > > }
> > > override def equals(other : Any) : Boolean = {
> > > other match {
> > > case GInt(value) => value.equals(v)
> > > case _ => false
> > > }
> > > }
> > > }
> > >
> > > /**
> > > * Represents a @GValue that contains a number as a @Double
> > > * @param v number
> > > */
> > > case class GDouble(v: Double) extends GValue {//with Ordered[GDouble]{
> > >
> > > def this() = this(0.0)
> > >
> > > override def equals(other : Any) : Boolean = {
> > > other match {
> > > case GDouble(value) => value.equals(v)
> > > case _ => false
> > > }
> > > }
> > > }
> > >
> > > /**
> > > * Represents a @GValue that contains a @String
> > > * @param v string
> > > */
> > > case class GString(v: String) extends GValue{
> > > def this() = this(".")
> > > override def toString() : String = {
> > > v.toString
> > > }
> > > override def equals(other : Any) : Boolean = {
> > > other match {
> > > case GString(value) => value.equals(v)
> > > case _ => false
> > > }
> > > }
> > > }
> > >
> > > Regards,
> > >
> > > -----------------------------------------------------------------
> > > Abdulrahman Kaitoua
> > > -----------------------------------------------------------------
> > > Ph.D. Candidate at Politecnico Di Milano
> > >
> > >
> >
>

Mime
View raw message