flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Felix Neutatz <neut...@googlemail.com>
Subject Re: TableAPI - Join on two keys
Date Fri, 17 Apr 2015 08:20:46 GMT
I am also against the manual cross method. Isn't it the idea of the table
API to hide the actual implementation from the user?

Best regards,
Felix
Am 17.04.2015 10:09 vorm. schrieb "Till Rohrmann" <till.rohrmann@gmail.com>:

> Why not doing two separate joins, union the results and doing a distinct
> operation on the combined key?
>
> On Fri, Apr 17, 2015 at 9:42 AM, Aljoscha Krettek <aljoscha@apache.org>
> wrote:
>
> > So, the first thing is a "feature" of the Java API that removes
> > duplicate fields in keys, so an equi-join on (0,0) with (0,1) would
> > throw an error because one 0 is removed from the first key.
> >
> > The second thing is a feature of the Table API where the error message
> > is hinting at the problem:
> > Could not derive equi-join predicates for predicate 'nodeID === 'src
> > || 'nodeID === 'target
> >
> > The problem is, that this would have to be executed as a cross
> > followed by a filter because none of the predicates are equi-join
> > predicates that must always be true (because of the OR relation). This
> > I don't want to allow, because a cross can be very expensive. I will
> > add a jira ticket for adding a manual cross operation to the Table
> > API.
> >
> > On Thu, Apr 16, 2015 at 2:28 PM, Felix Neutatz <neutatz@googlemail.com>
> > wrote:
> > > Hi,
> > >
> > > I want to join two tables in the following way:
> > >
> > > case class WeightedEdge(src: Int, target: Int, weight: Double)
> > > case class Community(communityID: Int, nodeID: Int)
> > >
> > > case class CommunitySumTotal(communityID: Int, sumTotal: Double)
> > >
> > > val communities: DataSet[Community]
> > > val weightedEdges: DataSet[WeightedEdge]
> > >
> > > val communitiesTable = communities.toTable
> > > val weightedEdgesTable = weightedEdges.toTable
> > >
> > > val sumTotal = communitiesTable.join(weightedEdgesTable)
> > >  .where("nodeID = src && nodeID = target")
> > >  .groupBy('communityID)
> > >  .select("communityID, weight.sum as
> sumTotal").toSet[CommunitySumTotal]
> > >
> > >
> > > but I get this exception:
> > >
> > > Exception in thread "main"
> > > org.apache.flink.api.common.InvalidProgramException: The types of the
> key
> > > fields do not match: The number of specified keys is different.
> > > at
> > >
> >
> org.apache.flink.api.java.operators.JoinOperator.<init>(JoinOperator.java:96)
> > > at
> > >
> >
> org.apache.flink.api.java.operators.JoinOperator$EquiJoin.<init>(JoinOperator.java:197)
> > > at
> > >
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:310)
> > > at
> > >
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145)
> > > at
> > >
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195)
> > > at
> > >
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183)
> > > at
> > >
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78)
> > > at
> > >
> >
> org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55)
> > > at
> > >
> >
> org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37)
> > > Moreover when I use the following where clause:
> > >
> > > .where("nodeID = src || nodeID = target")
> > >
> > > I get another error:
> > >
> > > Exception in thread "main"
> > > org.apache.flink.api.table.ExpressionException: Could not derive
> > > equi-join predicates for predicate 'nodeID === 'src || 'nodeID ===
> > > 'target.
> > >
> > > at
> > >
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:296)
> > > at
> > >
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145)
> > > at
> > >
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195)
> > > at
> > >
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183)
> > > at
> > >
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78)
> > > at
> > >
> >
> org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55)
> > > at
> > >
> >
> org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37)
> > >
> > >
> > > Apart from that the TableApi seems really promising. It's a really
> great
> > tool.
> > >
> > > Thank you for your help,
> > >
> > > Felix
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message