flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Till Rohrmann <till.rohrm...@gmail.com>
Subject Re: TableAPI - Join on two keys
Date Fri, 17 Apr 2015 08:08:56 GMT
Why not doing two separate joins, union the results and doing a distinct
operation on the combined key?

On Fri, Apr 17, 2015 at 9:42 AM, Aljoscha Krettek <aljoscha@apache.org>
wrote:

> So, the first thing is a "feature" of the Java API that removes
> duplicate fields in keys, so an equi-join on (0,0) with (0,1) would
> throw an error because one 0 is removed from the first key.
>
> The second thing is a feature of the Table API where the error message
> is hinting at the problem:
> Could not derive equi-join predicates for predicate 'nodeID === 'src
> || 'nodeID === 'target
>
> The problem is, that this would have to be executed as a cross
> followed by a filter because none of the predicates are equi-join
> predicates that must always be true (because of the OR relation). This
> I don't want to allow, because a cross can be very expensive. I will
> add a jira ticket for adding a manual cross operation to the Table
> API.
>
> On Thu, Apr 16, 2015 at 2:28 PM, Felix Neutatz <neutatz@googlemail.com>
> wrote:
> > Hi,
> >
> > I want to join two tables in the following way:
> >
> > case class WeightedEdge(src: Int, target: Int, weight: Double)
> > case class Community(communityID: Int, nodeID: Int)
> >
> > case class CommunitySumTotal(communityID: Int, sumTotal: Double)
> >
> > val communities: DataSet[Community]
> > val weightedEdges: DataSet[WeightedEdge]
> >
> > val communitiesTable = communities.toTable
> > val weightedEdgesTable = weightedEdges.toTable
> >
> > val sumTotal = communitiesTable.join(weightedEdgesTable)
> >  .where("nodeID = src && nodeID = target")
> >  .groupBy('communityID)
> >  .select("communityID, weight.sum as sumTotal").toSet[CommunitySumTotal]
> >
> >
> > but I get this exception:
> >
> > Exception in thread "main"
> > org.apache.flink.api.common.InvalidProgramException: The types of the key
> > fields do not match: The number of specified keys is different.
> > at
> >
> org.apache.flink.api.java.operators.JoinOperator.<init>(JoinOperator.java:96)
> > at
> >
> org.apache.flink.api.java.operators.JoinOperator$EquiJoin.<init>(JoinOperator.java:197)
> > at
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:310)
> > at
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145)
> > at
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195)
> > at
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183)
> > at
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78)
> > at
> >
> org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55)
> > at
> >
> org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37)
> > Moreover when I use the following where clause:
> >
> > .where("nodeID = src || nodeID = target")
> >
> > I get another error:
> >
> > Exception in thread "main"
> > org.apache.flink.api.table.ExpressionException: Could not derive
> > equi-join predicates for predicate 'nodeID === 'src || 'nodeID ===
> > 'target.
> >
> > at
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:296)
> > at
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145)
> > at
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195)
> > at
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183)
> > at
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78)
> > at
> >
> org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55)
> > at
> >
> org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37)
> >
> >
> > Apart from that the TableApi seems really promising. It's a really great
> tool.
> >
> > Thank you for your help,
> >
> > Felix
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message