flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: dynamically partitioned stream
Date Fri, 01 Sep 2017 15:59:39 GMT
Hi Martin,

I think with those requirements this is very hard (or maybe impossible) to do efficiently
in a distributed setting. It might be that I'm misunderstanding things but let's look at an
example. Assume that initially, we don't have any lambdas, so data can be sent to any machine
because it doesn't matter where they go. Now, we get a new lambda f2 [A, C]. Say this gets
routed to machine "0", now this means that messages with key A and C also need to be router
to machine "0". Now, we get a new lambda f1 [D, E], say this gets routed to machine "2", meaning
that messages with key D and E are also routed to machine "2".

Then, we get a new lambda f3 [C, D]. Do we now re-route all previous lambdas and inputs to
different machines? They all have to go to the same machine, but which one? I'm currently
thinking that there would need to be some component that does the routing, but this has to
be global, so it's hard to do in a distributed setting.

What do you think?

Best,
Aljoscha

> On 1. Sep 2017, at 07:17, Martin Eden <martineden131@gmail.com> wrote:
> 
> This might be a way forward but since side inputs are not there I will try and key the
control stream by the keys in the first co flat map.
> 
> I'll see how it goes.
> 
> Thanks guys,
> M
> 
> On Thu, Aug 31, 2017 at 5:16 PM, Tony Wei <tony19920430@gmail.com <mailto:tony19920430@gmail.com>>
wrote:
> Hi Martin,
> 
> Yes, that is exactly what I thought. 
> But the first step also needs to be fulfilled  by SideInput. I'm not sure how to achieve
this in the current release. 
> 
> Best,
> Tony Wei
> 
> Martin Eden <martineden131@gmail.com <mailto:martineden131@gmail.com>>於
2017年8月31日 週四,下午11:32寫道:
> Hi Aljoscha, Tony,
> 
> Aljoscha: 
> Yes it's the first option you mentioned.
> Yes, the stream has multiple values in flight for A, B, C. f1 needs to be applied each
time a new value for either A, B or C comes in. So we need to use state to cache the latest
values. So using the example data stream in my first msg the emitted stream should be:
> 
> 1. Data Stream:
> KEY VALUE TIME
> .
> .
> .
> C      V6        6
> B      V6        6
> A      V5        5
> A      V4        4
> C      V3        3
> A      V3        3
> B      V3        3
> B      V2        2
> A      V1        1
> 
> 2. Control Stream:
> Lambda  ArgumentKeys TIME
> .
> .
> .
> f2            [A, C]                 4
> f1            [A, B, C]            1
> 
> 3. Expected emitted stream:
> TIME    VALUE 
> .
> .
> .
> 6          f1(V5, V6, V3)
>             f1(V5, V6, V6)
>             f2(V5, V6)
> 5          f1(V5, V3, V3)
>             f2(V5, V3)
> 4          f1(V4, V3, V3)
>             f2(V4, V3)
> 3          f1(V3, V3, V3)
> 2          -
> 1          -
> 
> So essentially as soon as the argument list fills up then we apply the function/lambda
at each new arriving message in the data stream for either argument key.
> 
> Tony:
> Yes we need to group by and pass to the lambda.
> Ok, so what you are proposing might work. So your solution assumes that we have to connect
with the control stream twice? Once for the tagging and another time re-connect-ing the control
stream with the tagged stream for the actual application of the function/lambda?
> 
> Thanks,
> Alex
> 
> 
> 
> On Thu, Aug 31, 2017 at 2:57 PM, Aljoscha Krettek <aljoscha@apache.org <mailto:aljoscha@apache.org>>
wrote:
> Hi Martin,
> 
> In your original example, what does this syntax mean exactly:
> 
> f1            [A, B, C]            1
> 
> Does it mean that f1 needs one A, one B and one C from the main stream? If yes, which
ones, because there are multiple As and Bs and so on. Or does it mean that f1 can apply to
an A or a B or a C? If it's the first, then I think it's quite hard to find a partitioning
such that both f1, f2, and all A, B, and C go to the same machine.
> 
> Best,
> Aljoscha
> 
>> On 31. Aug 2017, at 15:53, Tony Wei <tony19920430@gmail.com <mailto:tony19920430@gmail.com>>
wrote:
>> 
>> Hi Martin,
>> 
>> So the problem is that you want to group those arguments in Data Stream and pass
them to the lambda function from Control Stream at the same time. Am I right?
>> 
>> If right, then you could give each lambda function an id as well. Use these ids to
tag those arguments to which they belong.
>> After that, keyBy function could be used to group those arguments belonging to the
same lambda function. Joining this stream with Control Stream by function id could make arguments
and function be in the same instance.
>> 
>> What do you think? Could this solution solve your problem?
>> 
>> Best,
>> Tony Wei
>> 
>> 2017-08-31 20:43 GMT+08:00 Martin Eden <martineden131@gmail.com <mailto:martineden131@gmail.com>>:
>> Thanks for your reply Tony,
>> 
>> Yes we are in the latter case, where the functions/lambdas come in the control stream.
Think of them as strings containing the logic of the function. The values for each of the
arguments to the function come from the data stream. That is why we need to co-locate the
data stream messages for the corresponding keys with the control message that has the function
to be applied.
>> 
>> We have a way of interpreting the logic described in the string and executing it
on the incoming values from the data stream. This is kicked off from within the Flink runtime
(synchronous to a flatMap of the RichCoFlatMapFunction) but is not using Flink predefined
operators or functions.
>> 
>> So yeah I see your point about mapping the arguments but the problem is not really
that, the problem is making sure that the values in the control stream are in the same instance
of the task/ keyed managed state as a the actual control stream message. Once they are we
can pass them in.
>> 
>> Any other thoughts?
>> 
>> M
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> On Thu, Aug 31, 2017 at 12:06 PM, Tony Wei <tony19920430@gmail.com <mailto:tony19920430@gmail.com>>
wrote:
>> Hi Martin,
>> 
>> About problem 2. How were those lambda functions created? Pre-defined functions /
operators or automatically generated based on the message from Control Stream?
>> 
>> For the former, you could give each function one id and user flapMap to duplicate
data with multiple ids. Then, you could use filter function and send them to the corresponding
operators.
>> 
>> For the general case like the latter, because you had broadcasted the messages to
all tasks, it could always build a mapping table from argument keys to lambda functions in
each sub-task and use the map to process the data. But I was wondering if it is possible to
generate a completely new function in the runtime.
>> 
>> Best,
>> Tony Wei
>> 
>> 2017-08-31 18:33 GMT+08:00 Martin Eden <martineden131@gmail.com <mailto:martineden131@gmail.com>>:
>> Thanks for your reply Tony.
>> 
>> So there are actually 2 problems to solve:
>> 
>> 1. All control stream msgs need to be broadcasted to all tasks.
>> 
>> 2. The data stream messages with the same keys as those specified in the control
message need to go to the same task as well, so that all the values required for the lambda
(i.e. functions f1, f2 ...) are there.
>> 
>> In my understanding side inputs (which are actually not available in the current
release) would address problem 1.
>> 
>> To address problem 1 I also tried dataStream.keyBy(key).connect(controlStream.broadcast).flatMap(new
RichCoFlatMapFunction) but I get a runtime exception telling me I still need to do a keyBy
before the flatMap. So are the upcoming side inputs the only way to broadcast a control stream
to all tasks of a coFlatMap? Or is there another way?
>> 
>> As for problem 2, I am still pending a reply. Would appreciate if anyone has some
suggestions.
>> 
>> Thanks,
>> M
>> 
>> 
>> 
>> 
>> On Thu, Aug 31, 2017 at 9:59 AM, Tony Wei <tony19920430@gmail.com <mailto:tony19920430@gmail.com>>
wrote:
>> Hi Martin,
>> 
>> Let me understand your question first.
>> You have two Stream: Data Stream and Control Stream and you want to select data in
Data Stream based on the key set got from Control Stream.
>> 
>> If I were not misunderstanding your question, I think SideInput is what you want.
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API#FLIP-17SideInputsforDataStreamAPI-StoringSide-InputData
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API#FLIP-17SideInputsforDataStreamAPI-StoringSide-InputData>
>> It lets you to define one stream as a SideInput and can be assigned to the other
stream, then the data in SideInput stream will be broadcasted.
>> 
>> So far, I have no idea if there is any solution to solve this without SideInput.
>> 
>> Best,
>> Tony Wei
>> 
>> 2017-08-31 16:10 GMT+08:00 Martin Eden <martineden131@gmail.com <mailto:martineden131@gmail.com>>:
>> Hi all,
>> 
>> I am trying to implement the following using Flink:
>> 
>> I have 2 input message streams:
>> 
>> 1. Data Stream:
>> KEY VALUE TIME
>> .
>> .
>> .
>> C      V6        6
>> B      V6        6
>> A      V5        5
>> A      V4        4
>> C      V3        3
>> A      V3        3
>> B      V3        3
>> B      V2        2
>> A      V1        1
>> 
>> 2. Control Stream:
>> Lambda  ArgumentKeys TIME
>> .
>> .
>> .
>> f2            [A, C]                 4
>> f1            [A, B, C]            1
>> 
>> I want to apply the lambdas coming in the control stream to the selection of keys
that are coming in the data stream.
>> 
>> Since we have 2 streams I naturally thought of connecting them using .connect. For
this I need to key both of them by a certain criteria. And here lies the problem, how can
I make sure the messages with keys A,B,C specified in the control stream end up in the same
task as well as the control message (f1, [A, B, C]) itself. Basically I don't know how to
key by to achieve this.
>> 
>> I suspect a custom partitioner is required that partitions the data stream based
on the messages in the control stream? Is this even possible?
>> 
>> Any suggestions welcomed!
>> 
>> Thanks,
>> M
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
> 
> 
> 


Mime
View raw message