flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Adrienne Kole <adrienneko...@gmail.com>
Subject Re: Keyed join Flink Streaming
Date Thu, 13 Oct 2016 09:46:29 GMT
Hi Ufuk,
Thanks for reply.

The example is at [1]. I have few questions:

If there is  no difference between KeyedStream- KeyedStream join by key and
DataStream-DataStream join, then DataStream becomes KeyedStream with
`where` and `equal` clauses. Please correct me If I am wrong.


Is the execution of windowed joins in Flink is reduced to only one machine
in cluster, as it has quite low throughput, when comparing to other
operations?



[1]
https://cwiki.apache.org/confluence/display/FLINK/Streams+and+Operations+on+Streams


Thanks
Adrienne

On Thu, Oct 13, 2016 at 10:59 AM Ufuk Celebi <uce@apache.org> wrote:

Hey Adrienne!

On Wed, Oct 12, 2016 at 4:10 PM, Adrienne Kole <adriennekole1@gmail.com>
wrote:
> Hi,
>
> I have 2 streams which are partitioned based on key field. I want to join
> those streams based on  key fields on windows. This is an example I saw in
> the flink website:
>
> val firstInput: DataStream[MyType] = ...
> val secondInput: DataStream[AnotherType] = ...
>
> val firstKeyed = firstInput.keyBy("userId")
> val secondKeyed = secondInput.keyBy("id")
>
> val result: DataStream[(MyType, AnotherType)] =
>    firstKeyed.join(secondKeyed)
>    onWindow(Time.of(5, SECONDS))

This does not work. I could not find this example in the Flink docs.
Do you remember where you found this? Would make sense to remove it.
:-)

You have to go with the other approach you described
(keyBy-join-where-equalTo-etc.). It would make sense to provide the
keyed stream join API though. If you like, you can open a JIRA issue
for it (you would need to tell me your JIRA ID so I can add you as a
contributor).

> val firstInput: KeyedStream[MyType] = ...
> val secondInput: KeyedStream[AnotherType] = ...
> val result: DataStream[(MyType, AnotherType)] =
>    firstKeyed.join(secondKeyed).where(..).equalTo(..).window(..).apply(..)
>
> and
>
>
> val firstInput: DataStream[MyType] = ...
> val secondInput: DataStream[AnotherType] = ...
> val result: DataStream[(MyType, AnotherType)] =
>    firstKeyed.join(secondKeyed).where(..).equalTo(..).window(..).apply(..)

Only if you need a specific KeyedDataStream operation, you would need
to go with the KeyedStream type. There is no difference execution wise
between the two examples.

– Ufuk

Mime
View raw message