flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Hi, join with two columns of both tables
Date Mon, 09 Nov 2015 12:58:38 GMT
Why don't you use a composite key for the Flink join
(first.join(second).where(0,1).equalTo(2,3).with(...)?
This would be more efficient and you can omit the check in the join
function.

Best, Fabian

2015-11-08 19:13 GMT+01:00 Philip Lee <philjjoon@gmail.com>:

> I want to join two tables with two columns like
>
> //    AND sr_customer_sk      = ws_bill_customer_sk
> //    AND sr_item_sk          = ws_item_sk
>
> val srJoinWs = storeReturn.join(webSales).where(_._item_sk).equalTo(_._item_sk){
>     (storeReturn: StoreReturn, webSales: WebSales, out: Collector[(Long,Long,Long)])
=>
>       if(storeReturn._customer_sk.equals(webSales._bill_customer_sk))
>         out.collect(storeReturn._item_sk,storeReturn._customer_sk,storeReturn._ticket_number)
>       else
>         None
> }
>
> According to the explaination from join phase, I should do like it if I want to join
like the way. Isn't it right?
>
> But the thing is it does not work in that Type dismatch; expected TypeInformation[Long],
actual(StoreReturn, WebSales, Collector[(Long,Long,Long)]) => Any
>
> I tried many ways but it still does not work.
>
> Any suggestion?
>
> Best Regards,
>
> Phil
>
>
>

Mime
View raw message