flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gyula Fóra <gyula.f...@gmail.com>
Subject Re: Regarding Broadcast of datasets in streaming context
Date Wed, 04 May 2016 09:02:27 GMT
Hi,

Iterating after every incoming point/centroid update means that you
basically defeat the purpose of having parallelism in your Flink job.

If you only "sync" the centroids periodically by the broadcast you can make
your program run efficiently in parallel. This should be fine for machine
learning use-cases where the results should converge anyways.

Gyula

Biplob Biswas <revolutionisme@gmail.com> ezt írta (időpont: 2016. máj. 2.,
H, 17:02):

> Hi Gyula,
>
> Could you explain a bit why i wouldn't want the centroids to be collected
> after every point?
>
> I mean, once I get a streamed point via map1 function .. i would want to
> compare the distance of the point with a centroid which arrives via map2
> function and i keep on comparing for every centroid which comes in
> subsequently, once the update of the centroid happens shouldn't i collect
> the entire set? Thus, updating a centroid and collecting it back for the
> next point in the iteration.
>
> I may not be getting the concept properly here, so an example snippet would
> help in a long run.
>
> Thanks & Regards
> Biplob
> Gyula Fóra wrote
> > Hey,
> >
> > I think you got the good idea :)
> >
> > So your coflatmap will get all the centroids that you have sent to the
> > stream in the closeWith call. This means that whenever you collect a new
> > set of centroids they will be iterated back. This means you don't always
> > want to send the centroids out on the collector, only periodically.
> >
> > The order in which these come is pretty much arbitrary so you need to
> make
> > sure to add some logic by which you can order it if this is important.
> >
> > Im not sure if this helped or not :D
> >
> > Gyula
> >
> > Biplob Biswas &lt;
>
> > revolutionisme@
>
> > &gt; ezt írta (időpont: 2016. máj. 2.,
> > H, 13:13):
> >
> >> Hi Gyula,
> >>
> >> I understand more now how this thing might work and its fascinating.
> >> Although I still have one question with the coflatmap function.
> >>
> >> First, let me explain what I understand and whether its correct or not:
> >> 1. The connected iterative stream ensures that the coflatmap function
> >> receive the points and the centroids which are broadcasted on each
> >> iteration
> >> defined by closewith.
> >>
> >> 2. So in the coflatmap function, on one map I get the points and on the
> >> other map function i get the centroids which are broadcasted.
> >>
> >> Now comes the part I am assuming a bit because I dont understand from
> the
> >> theory.
> >> 3. Assuming I can use the broadcasted centroids, I calculate the nearest
> >> centroid from the streamed point and I update the centroid and only use
> >> one
> >> of the collectors to return the updated centroids list back.
> >>
> >>
> >> The question here is, I am assuming that this operation is not done in
> >> parallel as if streams are sent in parallel how would I ensure correct
> >> update of the centroids as multiple points can try to update the same
> >> centroid in parallel .
> >>
> >> I hope I made myself clear with this.
> >>
> >> Thanks and Regards
> >> Biplob
> >> Biplob Biswas wrote
> >> > Hi Gyula,
> >> >
> >> > I read your workaround and started reading about flink iterations,
> >> > coflatmap operators and other things. Now, I do understand a few
> things
> >> > but the solution you provided is not completely clear to me.
> >> >
> >> > I understand the following things from your post.
> >> > 1. You initially have a datastream of points, on which you iterate and
> >> the
> >> > 'withFeedbackType' defines the type of the connected stream so rather
> >> than
> >> > "Points" the type is  "Centroids" now.
> >> >
> >> > 2.On this connected stream (which I understand, only have the streamed
> >> > points right now), you run a flat map operator. And you mention
> >> /
> >> > "MyCoFlatmap would be a CoFlatMapFunction which on 1 input receive
> >> events
> >> > and update its local centroids (and periodically output the centroids)
> >> and
> >> > on the other input would send centroids of other flatmaps and would
> >> merge
> >> > them to the local."
> >> /
> >> > I dont understand this part completely, if i am not wrong, you are
> >> saying
> >> > that the co flatmap function would have 2 map functions. Now i dont
> >> > understand this part .. as to what specifically am i doing in each map
> >> > function?
> >> >
> >> > 3. lastly, the updated centroids which came back from the coflatmap
> >> > function is fed back to the stream again and this is the part i get
> >> lost
> >> > again ... how is this centroid fed back and if this is fed back what
> >> > happens to the point stream? and if it does somehow is fed back, how
> do
> >> i
> >> > catch it in the coflatmap function?
> >> >
> >> >
> >> > If I understand this a bit, then in your code the first set of
> >> centroids
> >> > are created in the coflatmap function and you dont already have a list
> >> of
> >> > centroids to start with? Am i assuming it correct?
> >> >
> >> > I underwent the process of iteration in the Kmeans example from this
> >> > following link:
> >> >
> >>
> https://github.com/stratosphere/stratosphere/blob/master/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/clustering/KMeans.java
> >> >
> >> > and I understand how this is working .. but i am stil not clear how ur
> >> > example is working.
> >> >
> >> > Could you please explain it a bit more? with some examples maybe?
> >> >
> >> > Thanks a lot.
> >> > Gyula Fóra-2 wrote
> >> >> Hi Biplob,
> >> >>
> >> >> I have implemented a similar algorithm as Aljoscha mentioned.
> >> >>
> >> >> First things to clarify are the following:
> >> >> There is currently no abstraction for keeping objects (in you case
> >> >> centroids) in a centralized way that can be updated/read by all
> >> >> operators.
> >> >> This would probably be very costly and is actually not necessary in
> >> your
> >> >> case.
> >> >>
> >> >> Broadcast a stream in contrast with other partitioning methods mean
> >> that
> >> >> the events will be replicated to all downstream operators. This not
a
> >> >> magical operator that will make state available among parallel
> >> instances.
> >> >>
> >> >> Now let me explain what I think you want from Flink and how to do it
> >> :)
> >> >>
> >> >> You have input data stream and a set of centroids to be updated based
> >> on
> >> >> the incoming records. As you want to do this in parallel you have an
> >> >> operator (let's say a flatmap) that keeps the centroids locally and
> >> >> updates
> >> >> it on it's inputs. Now you have a set of independently updated
> >> centroids,
> >> >> so you want to merge them and update the centroids in each flatmap.
> >> >>
> >> >> Let's see how to do this. Given that you have your centroids locally,
> >> >> updating them is super easy, so I will not talk about that. The
> >> >> problematic
> >> >> part is periodically merging end "broadcasting" the centroids so all
> >> the
> >> >> flatmaps eventually see the same (they don't have to always be the
> >> same
> >> >> for
> >> >> clustering probably). There is no operator for sending state
> >> (centroids)
> >> >> between subtasks so you have to be clever here. We can actually use
> >> >> cyclic
> >> >> streams to solve this problem by sending the centroids as simple
> >> events
> >> >> to
> >> >> a CoFlatMap:
> >> >>
> >> >> DataStream
> >> >>
> > <Point>
> >> >>  input = ...
> >> >> ConnectedIterativeStreams&lt;Point, Centroids&gt; inputsAndCentroids
> =
> >> >> input.iterate().withFeedbackType(Centroids.class)
> >> >> DataStream
> >> >>
> > <Centroids>
> >> >>  updatedCentroids =
> >> >> inputsAndCentroids.flatMap(MyCoFlatmap)
> >> >> inputsAndCentroids.closeWith(updatedCentroids.broadcast())
> >> >>
> >> >> MyCoFlatmap would be a CoFlatMapFunction which on 1 input receive
> >> events
> >> >> and update its local centroids (and periodically output the
> centroids)
> >> >> and
> >> >> on the other input would send centroids of other flatmaps and would
> >> merge
> >> >> them to the local.
> >> >>
> >> >> This might be a lot to take in at first, so you might want to read
up
> >> on
> >> >> streaming iterations and connected streams before you start.
> >> >>
> >> >> Let me know if this makes sense.
> >> >>
> >> >> Cheers,
> >> >> Gyula
> >> >>
> >> >>
> >> >> Biplob Biswas &lt;
> >>
> >> >> revolutionisme@
> >>
> >> >> &gt; ezt írta (időpont: 2016. ápr. 28.,
> >> >> Cs, 14:41):
> >> >>
> >> >>> That would really be great, any example would help me proceed with
> my
> >> >>> work.
> >> >>> Thanks a lot.
> >> >>>
> >> >>>
> >> >>> Aljoscha Krettek wrote
> >> >>> > Hi Biplob,
> >> >>> > one of our developers had a stream clustering example a while
> back.
> >> It
> >> >>> was
> >> >>> > using a broadcast feedback edge with a co-operator to update
the
> >> >>> > centroids.
> >> >>> > I'll directly include him in the email so that he will notice
and
> >> can
> >> >>> send
> >> >>> > you the example.
> >> >>> >
> >> >>> > Cheers,
> >> >>> > Aljoscha
> >> >>> >
> >> >>> > On Thu, 28 Apr 2016 at 13:57 Biplob Biswas &lt;
> >> >>>
> >> >>> > revolutionisme@
> >> >>>
> >> >>> > &gt; wrote:
> >> >>> >
> >> >>> >> I am pretty new to flink systems, thus can anyone atleast
give me
> >> an
> >> >>> >> example
> >> >>> >> of how datastream.broadcast() method works? From the
> documentation
> >> i
> >> >>> get
> >> >>> >> the
> >> >>> >> following:
> >> >>> >>
> >> >>> >> broadcast()
> >> >>> >> Sets the partitioning of the DataStream so that the output
> >> elements
> >> >>> are
> >> >>> >> broadcasted to every parallel instance of the next operation.
> >> >>> >>
> >> >>> >> If the output elements are broadcasted, then how are they
> >> retrieved?
> >> >>> Or
> >> >>> >> maybe I am looking at this method in a completely wrong
way?
> >> >>> >>
> >> >>> >> Thanks
> >> >>> >> Biplob Biswas
> >> >>> >>
> >> >>> >>
> >> >>> >>
> >> >>> >> --
> >> >>> >> View this message in context:
> >> >>> >>
> >> >>>
> >>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6543.html
> >> >>> >> Sent from the Apache Flink User Mailing List archive.
mailing
> list
> >> >>> >> archive
> >> >>> >> at Nabble.com.
> >> >>> >>
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>> --
> >> >>> View this message in context:
> >> >>>
> >>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6548.html
> >> >>> Sent from the Apache Flink User Mailing List archive. mailing list
> >> >>> archive
> >> >>> at Nabble.com.
> >> >>>
> >>
> >>
> >>
> >>
> >>
> >> --
> >> View this message in context:
> >>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6612.html
> >> Sent from the Apache Flink User Mailing List archive. mailing list
> >> archive
> >> at Nabble.com.
> >>
>
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6619.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Mime
View raw message