flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vasiliki Kalavri <vasilikikala...@gmail.com>
Subject Re: [gelly] Spargel model rework
Date Wed, 06 Jan 2016 19:01:32 GMT
issue created: https://issues.apache.org/jira/browse/FLINK-3207

If anyone has any other suggestion about the renaming, let me know :)

-V.

On 5 January 2016 at 11:52, Aljoscha Krettek <aljoscha@apache.org> wrote:

> Nice to hear. :D
>
> I think you can go ahead and add the Jira. About the renaming: I also
> think that it would make sense to do it.
> > On 04 Jan 2016, at 19:48, Vasiliki Kalavri <vasilikikalavri@gmail.com>
> wrote:
> >
> > Hello squirrels and happy new year!
> >
> > I'm reviving this thread to share some results and discuss next steps.
> >
> > Using the Either type I was able to get rid of redundant messages and
> > vertex state. During the past few weeks, I have been running experiments,
> > which show that the performance of this "Pregel" model has improved a
> lot :)
> > In [1], you can see the speedup of GSA and Pregel over Spargel, for SSSP
> > and Connected Components (CC), for the Livejournal (68m edges), Orkut
> (117m
> > edges) and Wikipedia (340m edges) datasets.
> >
> > Regarding next steps, if no objections, I will open a Jira for adding a
> > Pregel iteration abstraction to Gelly. The Gelly guide has to be updated
> to
> > reflect the spectrum of iteration abstractions that we have discussed in
> > this thread, i.e. Pregel -> Spargel (Scatter-Gather) -> GSA.
> >
> > I think it might also be a good idea to do some renaming. Currently, we
> > call the Spargel iteration "vertex-centric", which fits better to the
> > Pregel abstraction. I propose we rename the spargel iteration into
> > "scatter-gather" or "signal-collect" (where it was first introduced [2]).
> > Any other ideas?
> >
> > Thanks,
> > -Vasia.
> >
> > [1]:
> >
> https://drive.google.com/file/d/0BzQJrI2eGlyYRTRjMkp1d3R6eVE/view?usp=sharing
> > [2]: http://link.springer.com/chapter/10.1007/978-3-642-17746-0_48
> >
> > On 11 November 2015 at 11:05, Stephan Ewen <sewen@apache.org> wrote:
> >
> >> See: https://issues.apache.org/jira/browse/FLINK-3002
> >>
> >> On Wed, Nov 11, 2015 at 10:54 AM, Stephan Ewen <sewen@apache.org>
> wrote:
> >>
> >>> "Either" an "Optional" types are quite useful.
> >>>
> >>> Let's add them to the core Java API.
> >>>
> >>> On Wed, Nov 11, 2015 at 10:00 AM, Vasiliki Kalavri <
> >>> vasilikikalavri@gmail.com> wrote:
> >>>
> >>>> Thanks Fabian! I'll try that :)
> >>>>
> >>>> On 10 November 2015 at 22:31, Fabian Hueske <fhueske@gmail.com>
> wrote:
> >>>>
> >>>>> You could implement a Java Either type (similar to Scala's Either)
> >> that
> >>>>> either has a Message or the VertexState and a corresponding
> >>>> TypeInformation
> >>>>> and TypeSerializer that serializes a byte flag to indicate which
both
> >>>> types
> >>>>> is used.
> >>>>> It might actually make sense, to add a generic Either type to the
> Java
> >>>> API
> >>>>> in general (similar to the Java Tuples with resemble the Scala
> >> Tuples).
> >>>>>
> >>>>> Cheers, Fabian
> >>>>>
> >>>>> 2015-11-10 22:16 GMT+01:00 Vasiliki Kalavri <
> >> vasilikikalavri@gmail.com
> >>>>> :
> >>>>>
> >>>>>> Hi,
> >>>>>>
> >>>>>> after running a few experiments, I can confirm that putting
the
> >>>> combiner
> >>>>>> after the flatMap is indeed more efficient.
> >>>>>>
> >>>>>> I ran SSSP and Connected Components with Spargel, GSA, and the
> >> Pregel
> >>>>> model
> >>>>>> and the results are the following:
> >>>>>>
> >>>>>> - for SSSP, Spargel is always the slowest, GSA is a ~1.2x faster
and
> >>>>> Pregel
> >>>>>> is ~1.1x faster without combiner, ~1.3x faster with combiner.
> >>>>>> - for Connected Components, Spargel and GSA perform similarly,
while
> >>>>> Pregel
> >>>>>> is 1.4-1.6x slower.
> >>>>>>
> >>>>>> To start with, this is much better than I expected :)
> >>>>>> However, there is a main shortcoming in my current implementation
> >> that
> >>>>>> negatively impacts performance:
> >>>>>> Since the compute function coGroup needs to output both new
vertex
> >>>> values
> >>>>>> and new messages, I emit a wrapping tuple that contains both
vertex
> >>>> state
> >>>>>> and messages and then filter them out based on a boolean field.
The
> >>>>> problem
> >>>>>> is that since I cannot emit null fields, I emit a dummy message
for
> >>>> each
> >>>>>> new vertex state and a dummy vertex state for each new message.
That
> >>>>>> essentially means that the intermediate messages result is double
in
> >>>>> size,
> >>>>>> if say the vertex values are of the same type as the messages
(can
> >> be
> >>>>> worse
> >>>>>> if the vertex values are more complex).
> >>>>>> So my question is, is there a way to avoid this redundancy,
by
> >> either
> >>>>>> emitting null fields or by creating an operator that could emit
2
> >>>>> different
> >>>>>> types of tuples?
> >>>>>>
> >>>>>> Thanks!
> >>>>>> -Vasia.
> >>>>>>
> >>>>>> On 9 November 2015 at 15:20, Fabian Hueske <fhueske@gmail.com>
> >> wrote:
> >>>>>>
> >>>>>>> Hi Vasia,
> >>>>>>>
> >>>>>>> sorry for the late reply.
> >>>>>>> I don't think there is a big difference. In both cases,
the
> >>>>> partitioning
> >>>>>>> and sorting happens at the end of the iteration.
> >>>>>>> If the groupReduce is applied before the workset is returned,
the
> >>>>> sorting
> >>>>>>> happens on the filtered result (after the flatMap) which
might be
> >> a
> >>>>>> little
> >>>>>>> bit more efficient (depending on the ratio of messages and
> >> solution
> >>>> set
> >>>>>>> updates). Also it does not require that the initial workset
is
> >>>> sorted
> >>>>> for
> >>>>>>> the first groupReduce.
> >>>>>>>
> >>>>>>> I would put it at the end.
> >>>>>>>
> >>>>>>> Cheers, Fabian
> >>>>>>>
> >>>>>>> 2015-11-05 17:19 GMT+01:00 Vasiliki Kalavri <
> >>>> vasilikikalavri@gmail.com
> >>>>>> :
> >>>>>>>
> >>>>>>>> @Fabian
> >>>>>>>>
> >>>>>>>> Is there any advantage in putting the reducer-combiner
before
> >>>>> updating
> >>>>>>> the
> >>>>>>>> workset vs. after (i.e. right before the join with the
solution
> >>>> set)?
> >>>>>>>>
> >>>>>>>> If it helps, here are the plans of these 2 alternatives:
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>
> https://drive.google.com/file/d/0BzQJrI2eGlyYcFV2RFo5dUFNXzg/view?usp=sharing
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>
> https://drive.google.com/file/d/0BzQJrI2eGlyYN014NXp6OEZUdGs/view?usp=sharing
> >>>>>>>>
> >>>>>>>> Thanks a lot for the help!
> >>>>>>>>
> >>>>>>>> -Vasia.
> >>>>>>>>
> >>>>>>>> On 30 October 2015 at 21:28, Fabian Hueske <fhueske@gmail.com>
> >>>>> wrote:
> >>>>>>>>
> >>>>>>>>> We can of course inject an optional ReduceFunction
(or
> >>>> GroupReduce,
> >>>>>> or
> >>>>>>>>> combinable GroupReduce) to reduce the size of the
work set.
> >>>>>>>>> I suggested to remove the GroupReduce function,
because it did
> >>>> only
> >>>>>>>> collect
> >>>>>>>>> all messages into a single record by emitting the
input
> >> iterator
> >>>>>> which
> >>>>>>> is
> >>>>>>>>> quite dangerous. Applying a combinable reduce function
is
> >> could
> >>>>>> improve
> >>>>>>>> the
> >>>>>>>>> performance considerably.
> >>>>>>>>>
> >>>>>>>>> The good news is that it would come "for free" because
the
> >>>>> necessary
> >>>>>>>>> partitioning and sorting can be reused (given the
forwardField
> >>>>>>>> annotations
> >>>>>>>>> are correctly set):
> >>>>>>>>> - The partitioning of the reduce can be reused for
the join
> >> with
> >>>>> the
> >>>>>>>>> solution set
> >>>>>>>>> - The sort of the reduce is preserved by the join
with the
> >>>>> in-memory
> >>>>>>>>> hash-table of the solution set and can be reused
for the
> >>>> coGroup.
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Fabian
> >>>>>>>>>
> >>>>>>>>> 2015-10-30 18:38 GMT+01:00 Vasiliki Kalavri <
> >>>>>> vasilikikalavri@gmail.com
> >>>>>>>> :
> >>>>>>>>>
> >>>>>>>>>> Hi Fabian,
> >>>>>>>>>>
> >>>>>>>>>> thanks so much for looking into this so quickly
:-)
> >>>>>>>>>>
> >>>>>>>>>> One update I have to make is that I tried running
a few
> >>>>> experiments
> >>>>>>>> with
> >>>>>>>>>> this on a 6-node cluster. The current implementation
gets
> >>>> stuck
> >>>>> at
> >>>>>>>>>> "Rebuilding Workset Properties" and never finishes
a single
> >>>>>>> iteration.
> >>>>>>>>>> Running the plan of one superstep without a
delta iteration
> >>>>>>> terminates
> >>>>>>>>>> fine. I didn't have access to the cluster today,
so I
> >> couldn't
> >>>>>> debug
> >>>>>>>> this
> >>>>>>>>>> further, but I will do as soon as I have access
again.
> >>>>>>>>>>
> >>>>>>>>>> The rest of my comments are inline:
> >>>>>>>>>>
> >>>>>>>>>> On 30 October 2015 at 17:53, Fabian Hueske <
> >> fhueske@gmail.com
> >>>>>
> >>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi Vasia,
> >>>>>>>>>>>
> >>>>>>>>>>> I had a look at your new implementation
and have a few
> >> ideas
> >>>>> for
> >>>>>>>>>>> improvements.
> >>>>>>>>>>> 1) Sending out the input iterator as you
do in the last
> >>>>>> GroupReduce
> >>>>>>>> is
> >>>>>>>>>>> quite dangerous and does not give a benefit
compared to
> >>>>>> collecting
> >>>>>>>> all
> >>>>>>>>>>> elements. Even though it is an iterator,
it needs to be
> >>>>>> completely
> >>>>>>>>>>> materialized in-memory whenever the record
is touched by
> >>>> Flink
> >>>>> or
> >>>>>>>> user
> >>>>>>>>>>> code.
> >>>>>>>>>>> I would propose to skip the reduce step
completely and
> >>>> handle
> >>>>> all
> >>>>>>>>>> messages
> >>>>>>>>>>> separates and only collect them in the CoGroup
function
> >>>> before
> >>>>>>> giving
> >>>>>>>>>> them
> >>>>>>>>>>> into the VertexComputeFunction. Be careful,
to only do
> >> that
> >>>>> with
> >>>>>>>>>>> objectReuse disabled or take care to properly
copy the
> >>>>> messages.
> >>>>>> If
> >>>>>>>> you
> >>>>>>>>>>> collect the messages in the CoGroup, you
don't need the
> >>>>>>> GroupReduce,
> >>>>>>>>> have
> >>>>>>>>>>> smaller records and you can remove the MessageIterator
> >> class
> >>>>>>>>> completely.
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> ​I see. The idea was to expose to message
combiner that user
> >>>>> could
> >>>>>>>>>> ​implement if the messages are combinable,
e.g. min, sum.
> >> This
> >>>>> is a
> >>>>>>>>> common
> >>>>>>>>>> case and reduces the message load significantly.
Is there a
> >>>> way I
> >>>>>>> could
> >>>>>>>>> do
> >>>>>>>>>> something similar before the coGroup?
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>> 2) Add this annotation to the AppendVertexState
function:
> >>>>>>>>>>> @ForwardedFieldsFirst("*->f0"). This
indicates that the
> >>>>> complete
> >>>>>>>>> element
> >>>>>>>>>> of
> >>>>>>>>>>> the first input becomes the first field
of the output.
> >> Since
> >>>>> the
> >>>>>>>> input
> >>>>>>>>> is
> >>>>>>>>>>> partitioned on "f0" (it comes out of the
partitioned
> >>>> solution
> >>>>>> set)
> >>>>>>>> the
> >>>>>>>>>>> result of ApplyVertexState will be partitioned
on "f0.f0"
> >>>> which
> >>>>>> is
> >>>>>>>>>>> (accidentially :-D) the join key of the
following coGroup
> >>>>>> function
> >>>>>>> ->
> >>>>>>>>> no
> >>>>>>>>>>> partitioning :-)
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> ​Great! I totally missed that ;)​
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>> 3) Adding the two flatMap functions behind
the CoGroup
> >>>> prevents
> >>>>>>>>> chaining
> >>>>>>>>>>> and causes therefore some serialization
overhead but
> >>>> shouldn't
> >>>>> be
> >>>>>>> too
> >>>>>>>>>> bad.
> >>>>>>>>>>>
> >>>>>>>>>>> So in total I would make this program as
follows:
> >>>>>>>>>>>
> >>>>>>>>>>> iVertices<K,VV>
> >>>>>>>>>>> iMessage<K, Message> = iVertices.map(new
InitWorkSet());
> >>>>>>>>>>>
> >>>>>>>>>>> iteration = iVertices.iterateDelta(iMessages,
maxIt, 0)
> >>>>>>>>>>> verticesWithMessage<Vertex, Message>
=
> >>>>> iteration.getSolutionSet()
> >>>>>>>>>>>  .join(iteration.workSet())
> >>>>>>>>>>>  .where(0) // solution set is local and
build side
> >>>>>>>>>>>  .equalTo(0) // workset is shuffled and
probe side of
> >>>> hashjoin
> >>>>>>>>>>> superstepComp<Vertex,Tuple2<K, Message>,Bool>
=
> >>>>>>>>>>> verticesWithMessage.coGroup(edgessWithValue)
> >>>>>>>>>>>  .where("f0.f0") // vwm is locally forward
and sorted
> >>>>>>>>>>>  .equalTo(0) //  edges are already partitioned
and sorted
> >>>> (if
> >>>>>>> cached
> >>>>>>>>>>> correctly)
> >>>>>>>>>>>  .with(...) // The coGroup collects all
messages in a
> >>>>> collection
> >>>>>>> and
> >>>>>>>>>> gives
> >>>>>>>>>>> it to the ComputeFunction
> >>>>>>>>>>> delta<Vertex> = superStepComp.flatMap(...)
// partitioned
> >>>> when
> >>>>>>> merged
> >>>>>>>>>> into
> >>>>>>>>>>> solution set
> >>>>>>>>>>> workSet<K, Message> = superStepComp.flatMap(...)
//
> >>>> partitioned
> >>>>>> for
> >>>>>>>>> join
> >>>>>>>>>>> iteration.closeWith(delta, workSet)
> >>>>>>>>>>>
> >>>>>>>>>>> So, if I am correct, the program will
> >>>>>>>>>>> - partition the workset
> >>>>>>>>>>> - sort the vertices with messages
> >>>>>>>>>>> - partition the delta
> >>>>>>>>>>>
> >>>>>>>>>>> One observation I have is that this program
requires that
> >>>> all
> >>>>>>>> messages
> >>>>>>>>>> fit
> >>>>>>>>>>> into memory. Was that also the case before?
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> ​I believe not. The plan has one coGroup that
produces the
> >>>>> messages
> >>>>>>>> and a
> >>>>>>>>>> following coGroup that groups by the messages
"target ID"
> >> and
> >>>>>>> consumes
> >>>>>>>>>> them​ in an iterator. That doesn't require
them to fit in
> >>>> memory,
> >>>>>>>> right?
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> ​I'm also working on a version where the graph
is
> >> represented
> >>>> as
> >>>>> an
> >>>>>>>>>> adjacency list, instead of two separate datasets
of vertices
> >>>> and
> >>>>>>> edges.
> >>>>>>>>> The
> >>>>>>>>>> disadvantage is that the graph has to fit in
memory, but I
> >>>> think
> >>>>>> the
> >>>>>>>>>> advantages are many​. We'll be able to support
edge value
> >>>>> updates,
> >>>>>>> edge
> >>>>>>>>>> mutations and different edge access order guarantees.
I'll
> >> get
> >>>>> back
> >>>>>>> to
> >>>>>>>>> this
> >>>>>>>>>> thread when I have a working prototype.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Cheers,
> >>>>>>>>>>> Fabian
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> ​Thanks again!
> >>>>>>>>>>
> >>>>>>>>>> Cheers,
> >>>>>>>>>> -Vasia.
> >>>>>>>>>> ​
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> 2015-10-27 19:10 GMT+01:00 Vasiliki Kalavri
<
> >>>>>>>> vasilikikalavri@gmail.com
> >>>>>>>>>> :
> >>>>>>>>>>>
> >>>>>>>>>>>> @Martin: thanks for your input! If you
ran into any
> >> other
> >>>>>> issues
> >>>>>>>>> that I
> >>>>>>>>>>>> didn't mention, please let us know.
Obviously, even with
> >>>> my
> >>>>>>>> proposal,
> >>>>>>>>>>> there
> >>>>>>>>>>>> are still features we cannot support,
e.g. updating edge
> >>>>> values
> >>>>>>> and
> >>>>>>>>>> graph
> >>>>>>>>>>>> mutations. We'll need to re-think the
underlying
> >> iteration
> >>>>>> and/or
> >>>>>>>>> graph
> >>>>>>>>>>>> representation for those.
> >>>>>>>>>>>>
> >>>>>>>>>>>> @Fabian: thanks a lot, no rush :)
> >>>>>>>>>>>> Let me give you some more information
that might make it
> >>>>> easier
> >>>>>>> to
> >>>>>>>>>> reason
> >>>>>>>>>>>> about performance:
> >>>>>>>>>>>>
> >>>>>>>>>>>> Currently, in Spargel the SolutionSet
(SS) keeps the
> >>>> vertex
> >>>>>> state
> >>>>>>>> and
> >>>>>>>>>> the
> >>>>>>>>>>>> workset (WS) keeps the active vertices.
The iteration is
> >>>>>> composed
> >>>>>>>> of
> >>>>>>>>> 2
> >>>>>>>>>>>> coGroups. The first one takes the WS
and the edges and
> >>>>> produces
> >>>>>>>>>> messages.
> >>>>>>>>>>>> The second one takes the messages and
the SS and
> >> produced
> >>>> the
> >>>>>> new
> >>>>>>>> WS
> >>>>>>>>>> and
> >>>>>>>>>>>> the SS-delta.
> >>>>>>>>>>>>
> >>>>>>>>>>>> In my proposal, the SS has the vertex
state and the WS
> >> has
> >>>>>>>> <vertexId,
> >>>>>>>>>>>> MessageIterator> pairs, i.e. the
inbox of each vertex.
> >> The
> >>>>> plan
> >>>>>>> is
> >>>>>>>>> more
> >>>>>>>>>>>> complicated because compute() needs
to have two
> >> iterators:
> >>>>> over
> >>>>>>> the
> >>>>>>>>>> edges
> >>>>>>>>>>>> and over the messages.
> >>>>>>>>>>>> First, I join SS and WS to get the active
vertices (have
> >>>>>>> received a
> >>>>>>>>>> msg)
> >>>>>>>>>>>> and their current state. Then I coGroup
the result with
> >>>> the
> >>>>>> edges
> >>>>>>>> to
> >>>>>>>>>>> access
> >>>>>>>>>>>> the neighbors. Now the main problem
is that this coGroup
> >>>>> needs
> >>>>>> to
> >>>>>>>>> have
> >>>>>>>>>> 2
> >>>>>>>>>>>> outputs: the new messages and the new
vertex value. I
> >>>>> couldn't
> >>>>>>>> really
> >>>>>>>>>>> find
> >>>>>>>>>>>> a nice way to do this, so I'm emitting
a Tuple that
> >>>> contains
> >>>>>> both
> >>>>>>>>> types
> >>>>>>>>>>> and
> >>>>>>>>>>>> I have a flag to separate them later
with 2 flatMaps.
> >> From
> >>>>> the
> >>>>>>>> vertex
> >>>>>>>>>>>> flatMap, I crete the SS-delta and from
the messaged
> >>>> flatMap I
> >>>>>>>> apply a
> >>>>>>>>>>>> reduce to group the messages by vertex
and send them to
> >>>> the
> >>>>> new
> >>>>>>> WS.
> >>>>>>>>> One
> >>>>>>>>>>>> optimization would be to expose a combiner
here to
> >> reduce
> >>>>>> message
> >>>>>>>>> size.
> >>>>>>>>>>>>
> >>>>>>>>>>>> tl;dr:
> >>>>>>>>>>>> 1. 2 coGroups vs. Join + coGroup + flatMap
+ reduce
> >>>>>>>>>>>> 2. how can we efficiently emit 2 different
types of
> >>>> records
> >>>>>> from
> >>>>>>> a
> >>>>>>>>>>> coGroup?
> >>>>>>>>>>>> 3. does it make any difference if we
group/combine the
> >>>>> messages
> >>>>>>>>> before
> >>>>>>>>>>>> updating the workset or after?
> >>>>>>>>>>>>
> >>>>>>>>>>>> Cheers,
> >>>>>>>>>>>> -Vasia.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On 27 October 2015 at 18:39, Fabian
Hueske <
> >>>>> fhueske@gmail.com>
> >>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> I'll try to have a look at the proposal
from a
> >>>> performance
> >>>>>>> point
> >>>>>>>> of
> >>>>>>>>>>> view
> >>>>>>>>>>>> in
> >>>>>>>>>>>>> the next days.
> >>>>>>>>>>>>> Please ping me, if I don't follow
up this thread.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Cheers, Fabian
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 2015-10-27 18:28 GMT+01:00 Martin
Junghanns <
> >>>>>>>>> m.junghanns@mailbox.org
> >>>>>>>>>>> :
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> At our group, we also moved
several algorithms from
> >>>>> Giraph
> >>>>>> to
> >>>>>>>>> Gelly
> >>>>>>>>>>> and
> >>>>>>>>>>>>>> ran into some confusing issues
(first in
> >>>> understanding,
> >>>>>>> second
> >>>>>>>>>> during
> >>>>>>>>>>>>>> implementation) caused by the
conceptional
> >> differences
> >>>>> you
> >>>>>>>>>> described.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> If there are no concrete advantages
(performance
> >>>> mainly)
> >>>>> in
> >>>>>>> the
> >>>>>>>>>>> Spargel
> >>>>>>>>>>>>>> implementation, we would be
very happy to see the
> >>>> Gelly
> >>>>> API
> >>>>>>> be
> >>>>>>>>>>> aligned
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>> Pregel-like systems.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Your SSSP example speaks for
itself.
> >> Straightforward,
> >>>> if
> >>>>>> the
> >>>>>>>>> reader
> >>>>>>>>>>> is
> >>>>>>>>>>>>>> familiar with Pregel/Giraph/...
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>> Martin
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On 27.10.2015 17:40, Vasiliki
Kalavri wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hello squirrels,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I want to discuss with you
a few concerns I have
> >>>> about
> >>>>> our
> >>>>>>>>> current
> >>>>>>>>>>>>>>> vertex-centric model implementation,
Spargel, now
> >>>> fully
> >>>>>>>> subsumed
> >>>>>>>>>> by
> >>>>>>>>>>>>> Gelly.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Spargel is our implementation
of Pregel [1], but it
> >>>>>> violates
> >>>>>>>>> some
> >>>>>>>>>>>>>>> fundamental properties of
the model, as described
> >> in
> >>>> the
> >>>>>>> paper
> >>>>>>>>> and
> >>>>>>>>>>> as
> >>>>>>>>>>>>>>> implemented in e.g. Giraph,
GPS, Hama. I often find
> >>>>> myself
> >>>>>>>>>> confused
> >>>>>>>>>>>> both
> >>>>>>>>>>>>>>> when trying to explain it
to current Giraph users
> >> and
> >>>>> when
> >>>>>>>>> porting
> >>>>>>>>>>> my
> >>>>>>>>>>>>>>> Giraph algorithms to it.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> More specifically:
> >>>>>>>>>>>>>>> - in the Pregel model, messages
produced in
> >>>> superstep n,
> >>>>>> are
> >>>>>>>>>>> received
> >>>>>>>>>>>> in
> >>>>>>>>>>>>>>> superstep n+1. In Spargel,
they are produced and
> >>>>> consumed
> >>>>>> in
> >>>>>>>> the
> >>>>>>>>>>> same
> >>>>>>>>>>>>>>> iteration.
> >>>>>>>>>>>>>>> - in Pregel, vertices are
active during a
> >> superstep,
> >>>> if
> >>>>>> they
> >>>>>>>>> have
> >>>>>>>>>>>>> received
> >>>>>>>>>>>>>>> a message in the previous
superstep. In Spargel, a
> >>>>> vertex
> >>>>>> is
> >>>>>>>>>> active
> >>>>>>>>>>>>> during
> >>>>>>>>>>>>>>> a superstep if it has changed
its value.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> These two differences require
a lot of rethinking
> >>>> when
> >>>>>>> porting
> >>>>>>>>>>>>>>> applications
> >>>>>>>>>>>>>>> and can easily cause bugs.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> The most important problem
however is that we
> >> require
> >>>>> the
> >>>>>>> user
> >>>>>>>>> to
> >>>>>>>>>>>> split
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>> computation in 2 phases
(2 UDFs):
> >>>>>>>>>>>>>>> - messaging: has access
to the vertex state and can
> >>>>>> produce
> >>>>>>>>>> messages
> >>>>>>>>>>>>>>> - update: has access to
incoming messages and can
> >>>> update
> >>>>>> the
> >>>>>>>>>> vertex
> >>>>>>>>>>>>> value
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Pregel/Giraph only expose
one UDF to the user:
> >>>>>>>>>>>>>>> - compute: has access to
both the vertex state and
> >>>> the
> >>>>>>>> incoming
> >>>>>>>>>>>>> messages,
> >>>>>>>>>>>>>>> can produce messages and
update the vertex value.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> This might not seem like
a big deal, but except
> >> from
> >>>>>> forcing
> >>>>>>>> the
> >>>>>>>>>>> user
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>> split their program logic
into 2 phases, Spargel
> >> also
> >>>>>> makes
> >>>>>>>> some
> >>>>>>>>>>>> common
> >>>>>>>>>>>>>>> computation patterns non-intuitive
or impossible to
> >>>>>> write. A
> >>>>>>>>> very
> >>>>>>>>>>>> simple
> >>>>>>>>>>>>>>> example is propagating a
message based on its value
> >>>> or
> >>>>>>> sender
> >>>>>>>>> ID.
> >>>>>>>>>> To
> >>>>>>>>>>>> do
> >>>>>>>>>>>>>>> this with Spargel, one has
to store all the
> >> incoming
> >>>>>>> messages
> >>>>>>>> in
> >>>>>>>>>> the
> >>>>>>>>>>>>>>> vertex
> >>>>>>>>>>>>>>> value (might be of different
type btw) during the
> >>>>>> messaging
> >>>>>>>>> phase,
> >>>>>>>>>>> so
> >>>>>>>>>>>>> that
> >>>>>>>>>>>>>>> they can be accessed during
the update phase.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> So, my first question is,
when implementing
> >> Spargel,
> >>>>> were
> >>>>>>>> other
> >>>>>>>>>>>>>>> alternatives considered
and maybe rejected in favor
> >>>> of
> >>>>>>>>> performance
> >>>>>>>>>>> or
> >>>>>>>>>>>>>>> because of some other reason?
If someone knows, I
> >>>> would
> >>>>>> love
> >>>>>>>> to
> >>>>>>>>>> hear
> >>>>>>>>>>>>> about
> >>>>>>>>>>>>>>> them!
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Second, I wrote a prototype
implementation [2] that
> >>>> only
> >>>>>>>> exposes
> >>>>>>>>>> one
> >>>>>>>>>>>>> UDF,
> >>>>>>>>>>>>>>> compute(), by keeping the
vertex state in the
> >>>> solution
> >>>>> set
> >>>>>>> and
> >>>>>>>>> the
> >>>>>>>>>>>>>>> messages
> >>>>>>>>>>>>>>> in the workset. This way
all previously mentioned
> >>>>>>> limitations
> >>>>>>>> go
> >>>>>>>>>>> away
> >>>>>>>>>>>>> and
> >>>>>>>>>>>>>>> the API (see "SSSPComputeFunction"
in the example
> >>>> [3])
> >>>>>>> looks a
> >>>>>>>>> lot
> >>>>>>>>>>>> more
> >>>>>>>>>>>>>>> like Giraph (see [4]).
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I have not run any experiments
yet and the
> >> prototype
> >>>> has
> >>>>>>> some
> >>>>>>>>> ugly
> >>>>>>>>>>>>> hacks,
> >>>>>>>>>>>>>>> but if you think any of
this makes sense, then I'd
> >> be
> >>>>>>> willing
> >>>>>>>> to
> >>>>>>>>>>>> follow
> >>>>>>>>>>>>> up
> >>>>>>>>>>>>>>> and try to optimize it.
If we see that it performs
> >>>> well,
> >>>>>> we
> >>>>>>>> can
> >>>>>>>>>>>> consider
> >>>>>>>>>>>>>>> either replacing Spargel
or adding it as an
> >>>> alternative.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks for reading this
long e-mail and looking
> >>>> forward
> >>>>> to
> >>>>>>>> your
> >>>>>>>>>>> input!
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>> -Vasia.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> [1]:
> >>>> https://kowshik.github.io/JPregel/pregel_paper.pdf
> >>>>>>>>>>>>>>> [2]:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>
> https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew
> >>>>>>>>>>>>>>> [3]:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>
> https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java
> >>>>>>>>>>>>>>> [4]:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>
> https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>>
> >>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message