flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: TableAPI - Join on two keys
Date Fri, 17 Apr 2015 07:42:26 GMT
So, the first thing is a "feature" of the Java API that removes
duplicate fields in keys, so an equi-join on (0,0) with (0,1) would
throw an error because one 0 is removed from the first key.

The second thing is a feature of the Table API where the error message
is hinting at the problem:
Could not derive equi-join predicates for predicate 'nodeID === 'src
|| 'nodeID === 'target

The problem is, that this would have to be executed as a cross
followed by a filter because none of the predicates are equi-join
predicates that must always be true (because of the OR relation). This
I don't want to allow, because a cross can be very expensive. I will
add a jira ticket for adding a manual cross operation to the Table
API.

On Thu, Apr 16, 2015 at 2:28 PM, Felix Neutatz <neutatz@googlemail.com> wrote:
> Hi,
>
> I want to join two tables in the following way:
>
> case class WeightedEdge(src: Int, target: Int, weight: Double)
> case class Community(communityID: Int, nodeID: Int)
>
> case class CommunitySumTotal(communityID: Int, sumTotal: Double)
>
> val communities: DataSet[Community]
> val weightedEdges: DataSet[WeightedEdge]
>
> val communitiesTable = communities.toTable
> val weightedEdgesTable = weightedEdges.toTable
>
> val sumTotal = communitiesTable.join(weightedEdgesTable)
>  .where("nodeID = src && nodeID = target")
>  .groupBy('communityID)
>  .select("communityID, weight.sum as sumTotal").toSet[CommunitySumTotal]
>
>
> but I get this exception:
>
> Exception in thread "main"
> org.apache.flink.api.common.InvalidProgramException: The types of the key
> fields do not match: The number of specified keys is different.
> at
> org.apache.flink.api.java.operators.JoinOperator.<init>(JoinOperator.java:96)
> at
> org.apache.flink.api.java.operators.JoinOperator$EquiJoin.<init>(JoinOperator.java:197)
> at
> org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:310)
> at
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145)
> at
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195)
> at
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183)
> at
> org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78)
> at
> org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55)
> at
> org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37)
> Moreover when I use the following where clause:
>
> .where("nodeID = src || nodeID = target")
>
> I get another error:
>
> Exception in thread "main"
> org.apache.flink.api.table.ExpressionException: Could not derive
> equi-join predicates for predicate 'nodeID === 'src || 'nodeID ===
> 'target.
>
> at
> org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:296)
> at
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145)
> at
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195)
> at
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183)
> at
> org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78)
> at
> org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55)
> at
> org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37)
>
>
> Apart from that the TableApi seems really promising. It's a really great tool.
>
> Thank you for your help,
>
> Felix

Mime
View raw message