flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Joshua Griffith <JGriff...@CampusLabs.com>
Subject Re: Nested Field Expressions with Rows
Date Mon, 10 Jul 2017 16:59:35 GMT
Indeed that worked. Thanks!

> On Jul 10, 2017, at 11:57 AM, Fabian Hueske <fhueske@gmail.com> wrote:
> 
> Hi,
> 
> You have to add the implicit value in the main() method before you call .map(rowFn) and
not in the MapFunction.
> 
> Best, Fabian
> 
> 
> 2017-07-10 18:54 GMT+02:00 Joshua Griffith <JGriffith@campuslabs.com <mailto:JGriffith@campuslabs.com>>:
> Hello Fabian,
> 
> Thank you for your response. I tried your recommendation but I’m getting the same issue.
Here’s the altered MakeRow MapFunction I tried:
> 
>   class MakeRow extends MapFunction[(Integer, Integer), Row] {
>     implicit val rowType: TypeInformation[Row] = new RowTypeInfo(
>       Array[TypeInformation[_]](BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
>       Array("id", "value")
>     )
>     override def map(tuple: (Integer, Integer)): Row = tuple match {
>       case (value, id) => Row.of(id, value)
>     }
>   }
> 
> In stepping through the code execution, it looks like the problem is that Row.isKeyType()
returns false <https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2Frelease-1.3.1-rc2%2Fflink-core%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fapi%2Fcommon%2Foperators%2FKeys.java%23L98-L100&data=02%7C01%7CJGriffith%40campuslabs.com%7C148b39863bd145e2c18408d4c7b4cb09%7C809fd6c8b87647a9abe28be2888f4a55%7C0%7C0%7C636353026690173035&sdata=lkbNW%2FpRfQEscUFgeu3ExtrmZh6N%2FOI18AfcJx3agII%3D&reserved=0>.
Any recommendations?
> 
> Thanks,
> 
> Joshua
> 
> 
>> On Jul 10, 2017, at 11:42 AM, Fabian Hueske <fhueske@gmail.com <mailto:fhueske@gmail.com>>
wrote:
>> 
>> Hi Joshua,
>> 
>> thanks for reporting this issue. You code is fine but IMO there is a bug in the Scala
DataSet API.
>> It simply does not respect the type information provided by the ResultTypeQueryable[Row]
interface and defaults to a GenericType.
>> 
>> I think this should be fix. I'll open a JIRA issue for that.
>> 
>> You can explicitly declare types with implicits if you put the following lines above
the lines in which you apply the rowFn on the DataSet.
>> 
>> implicit val rowTpe: TypeInformation[Row] = new RowTypeInfo(
>>   Array[TypeInformation[_]](BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
>>   Array("id", "value")
>> )
>> When you do this, you can also remove move the ResultTypeQueryable interface from
the MapFunction.
>> 
>> Cheers, Fabian
>> 
>> 
>> 
>> 2017-07-10 18:10 GMT+02:00 Joshua Griffith <JGriffith@campuslabs.com <mailto:JGriffith@campuslabs.com>>:
>> Thank you for your response Nico. Below is a simple case where I’m trying to join
on Row fields:
>> 
>> package com.github.hadronzoo.rowerror
>> 
>> import org.apache.flink.api.co <https://na01.safelinks.protection.outlook.com/?url=http%3A%2F%2Forg.apache.flink.api.co&data=02%7C01%7CJGriffith%40campuslabs.com%7C148b39863bd145e2c18408d4c7b4cb09%7C809fd6c8b87647a9abe28be2888f4a55%7C0%7C0%7C636353026690173035&sdata=sXGi4RgjOnDbDNJFJs8193jiebxru%2FBOe7krZ0hDSzI%3D&reserved=0>mmon.functions.MapFunction
>> import org.apache.flink.api.co <https://na01.safelinks.protection.outlook.com/?url=http%3A%2F%2Forg.apache.flink.api.co&data=02%7C01%7CJGriffith%40campuslabs.com%7C148b39863bd145e2c18408d4c7b4cb09%7C809fd6c8b87647a9abe28be2888f4a55%7C0%7C0%7C636353026690173035&sdata=sXGi4RgjOnDbDNJFJs8193jiebxru%2FBOe7krZ0hDSzI%3D&reserved=0>mmon.typeinfo.{BasicTypeInfo,
TypeInformation}
>> import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo}
>> import org.apache.flink.api.sc <https://na01.safelinks.protection.outlook.com/?url=http%3A%2F%2Forg.apache.flink.api.sc&data=02%7C01%7CJGriffith%40campuslabs.com%7C148b39863bd145e2c18408d4c7b4cb09%7C809fd6c8b87647a9abe28be2888f4a55%7C0%7C0%7C636353026690173035&sdata=SM6GjKvOd%2BfN9HzLdUaYe%2BJ2xLe%2FjqaqWZIMMAyR%2Bvg%3D&reserved=0>ala._
>> import org.apache.flink.types.Row
>> 
>> object Main {
>> 
>>   class MakeRow extends MapFunction[(Integer, Integer), Row] with ResultTypeQueryable[Row]
{
>>     override def map(tuple: (Integer, Integer)): Row = tuple match {
>>       case (value, id) => Row.of(id, value)
>>     }
>> 
>>     override def getProducedType: TypeInformation[Row] =
>>       new RowTypeInfo(
>>         Array[TypeInformation[_]](BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
>>         Array("id", "value")
>>       )
>>   }
>> 
>>   def integerTuple(intTuple: (Int, Int)): (Integer, Integer) = intTuple match { case
(a, b) => (a, b) }
>> 
>>   def main(args: Array[String]): Unit = {
>>     val env = ExecutionEnvironment.createLocalEnvironment()
>>     val rowFn = new MakeRow
>> 
>>     val ints = 0 until 1000
>>     val evenIntegers = (ints filter (_ % 2 == 0)).zipWithIndex.map(integerTuple)
>>     val oddIntegers = (ints filter (_ % 2 == 1)).zipWithIndex.map(integerTuple)
>> 
>>     val evenRows = env.fromCollection(evenIntegers).map(rowFn)
>>     val oddRows = env.fromCollection(oddIntegers).map(rowFn)
>> 
>>     evenRows.join(oddRows).where("id").equalTo("id").print()
>>   }
>> }
>> 
>> Executing the above yields the following error:
>> 
>> Exception in thread "main" org.apache.flink.api.common.In <https://na01.safelinks.protection.outlook.com/?url=http%3A%2F%2Forg.apache.flink.api.common.In&data=02%7C01%7CJGriffith%40campuslabs.com%7C148b39863bd145e2c18408d4c7b4cb09%7C809fd6c8b87647a9abe28be2888f4a55%7C0%7C0%7C636353026690173035&sdata=MQRpagG2i%2F4GCEGuW%2FkiXdzTuNasDtCPPEZRmjSTFhQ%3D&reserved=0>validProgramException:
This type (GenericType<org.apache.flink.types.Row>) cannot be used as key.
>> 	at org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:330)
>> 	at org.apache.flink.api.scala.UnfinishedKeyPairOperation.where(unfinishedKeyPairOperation.scala:72)
>> 	at com.github.hadronzoo.rowerror.Main$.main(Main.scala:36)
>> 	at com.github.hadronzoo.rowerror.Main.main(Main.scala)
>> 
>> For my application I only have TypeInformation at runtime (before the execution graph
is built). Is it possible to use Row fields in join operations or is there an error with my
implementation?
>> 
>> Thanks for your help,
>> 
>> Joshua
>> 
>>> On Jul 10, 2017, at 9:09 AM, Nico Kruber <nico@data-artisans.com <mailto:nico@data-artisans.com>>
wrote:
>>> 
>>> Can you show a minimal example of the query you are trying to run?
>>> Maybe Timo or Fabian (cc'd) can help.
>>> 
>>> 
>>> Nico
>>> 
>>> On Friday, 7 July 2017 23:09:09 CEST Joshua Griffith wrote:
>>>> Hello,
>>>> 
>>>> When using nested field expressions like “Account.Id" with nested rows,
I
>>>> get the following error, “This type
>>>> (GenericType<org.apache.flink.types.Row>) cannot be used as key.”
Is there
>>>> a way to make nested field expressions work with nested rows?
>>> 
>>>> Thanks,
>>>> 
>>>> Joshua
>>> 
>> 
>> 
> 
> 


Mime
View raw message