Return-Path: X-Original-To: apmail-flink-dev-archive@www.apache.org Delivered-To: apmail-flink-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C3F3D185E0 for ; Mon, 9 Nov 2015 14:21:20 +0000 (UTC) Received: (qmail 85918 invoked by uid 500); 9 Nov 2015 14:21:20 -0000 Delivered-To: apmail-flink-dev-archive@flink.apache.org Received: (qmail 85857 invoked by uid 500); 9 Nov 2015 14:21:20 -0000 Mailing-List: contact dev-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list dev@flink.apache.org Received: (qmail 85845 invoked by uid 99); 9 Nov 2015 14:21:20 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 09 Nov 2015 14:21:20 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id CE94FC6468 for ; Mon, 9 Nov 2015 14:21:19 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.9 X-Spam-Level: *** X-Spam-Status: No, score=3.9 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, FREEMAIL_REPLY=1, HTML_MESSAGE=3, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id v0yS2K51qULc for ; Mon, 9 Nov 2015 14:21:11 +0000 (UTC) Received: from mail-lb0-f174.google.com (mail-lb0-f174.google.com [209.85.217.174]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with ESMTPS id 61FA823053 for ; Mon, 9 Nov 2015 14:21:10 +0000 (UTC) Received: by lbbkw15 with SMTP id kw15so93664771lbb.0 for ; Mon, 09 Nov 2015 06:21:08 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :content-type; bh=ZOT9BEjCoKQmzNCRpztGsOyfwZWMEyWfWlwYzf+4sLY=; b=boOPWZDXq2XpLAnvzqZY2di9+XLytXRDI4R2Dd9t3uivJJvQBvX4GGSJ9g8slYqQKV 5lsVG2OPzwZe3EmPAy79OU9IfX3dfS8EvuMT51vTNBN9ofsbZdOqAYmJ3ygVAtCxjcfI BbL81FwwwDwPSG8BE60YRCN9itVUyNr7n46SIF14qNKLyNHbHcoSa86qYhT2XcuE+g8F +iD+EXKUCVrQ/DGhNVa9y/KIGp2vhwiUrq1YVTwj1Od2tcaUaSiIlxFwqtgmEQpjwPpU EAbW62nEE4DD2qm7afwLYdEZ/tC2UwLxNZeJUpJiCssN4vIC7jA+v5N5hu1ISuAQpbed EHdg== X-Received: by 10.112.72.201 with SMTP id f9mr13924065lbv.62.1447078868601; Mon, 09 Nov 2015 06:21:08 -0800 (PST) MIME-Version: 1.0 Received: by 10.112.160.162 with HTTP; Mon, 9 Nov 2015 06:20:38 -0800 (PST) In-Reply-To: References: <562FB44D.7000808@mailbox.org> From: Fabian Hueske Date: Mon, 9 Nov 2015 15:20:38 +0100 Message-ID: Subject: Re: [gelly] Spargel model rework To: "dev@flink.apache.org" Content-Type: multipart/alternative; boundary=001a11c24000ecddc405241c4d9d --001a11c24000ecddc405241c4d9d Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hi Vasia, sorry for the late reply. I don't think there is a big difference. In both cases, the partitioning and sorting happens at the end of the iteration. If the groupReduce is applied before the workset is returned, the sorting happens on the filtered result (after the flatMap) which might be a little bit more efficient (depending on the ratio of messages and solution set updates). Also it does not require that the initial workset is sorted for the first groupReduce. I would put it at the end. Cheers, Fabian 2015-11-05 17:19 GMT+01:00 Vasiliki Kalavri : > @Fabian > > Is there any advantage in putting the reducer-combiner before updating th= e > workset vs. after (i.e. right before the join with the solution set)? > > If it helps, here are the plans of these 2 alternatives: > > > https://drive.google.com/file/d/0BzQJrI2eGlyYcFV2RFo5dUFNXzg/view?usp=3Ds= haring > > https://drive.google.com/file/d/0BzQJrI2eGlyYN014NXp6OEZUdGs/view?usp=3Ds= haring > > Thanks a lot for the help! > > -Vasia. > > On 30 October 2015 at 21:28, Fabian Hueske wrote: > > > We can of course inject an optional ReduceFunction (or GroupReduce, or > > combinable GroupReduce) to reduce the size of the work set. > > I suggested to remove the GroupReduce function, because it did only > collect > > all messages into a single record by emitting the input iterator which = is > > quite dangerous. Applying a combinable reduce function is could improve > the > > performance considerably. > > > > The good news is that it would come "for free" because the necessary > > partitioning and sorting can be reused (given the forwardField > annotations > > are correctly set): > > - The partitioning of the reduce can be reused for the join with the > > solution set > > - The sort of the reduce is preserved by the join with the in-memory > > hash-table of the solution set and can be reused for the coGroup. > > > > Best, > > Fabian > > > > 2015-10-30 18:38 GMT+01:00 Vasiliki Kalavri = : > > > > > Hi Fabian, > > > > > > thanks so much for looking into this so quickly :-) > > > > > > One update I have to make is that I tried running a few experiments > with > > > this on a 6-node cluster. The current implementation gets stuck at > > > "Rebuilding Workset Properties" and never finishes a single iteration= . > > > Running the plan of one superstep without a delta iteration terminate= s > > > fine. I didn't have access to the cluster today, so I couldn't debug > this > > > further, but I will do as soon as I have access again. > > > > > > The rest of my comments are inline: > > > > > > On 30 October 2015 at 17:53, Fabian Hueske wrote: > > > > > > > Hi Vasia, > > > > > > > > I had a look at your new implementation and have a few ideas for > > > > improvements. > > > > 1) Sending out the input iterator as you do in the last GroupReduce > is > > > > quite dangerous and does not give a benefit compared to collecting > all > > > > elements. Even though it is an iterator, it needs to be completely > > > > materialized in-memory whenever the record is touched by Flink or > user > > > > code. > > > > I would propose to skip the reduce step completely and handle all > > > messages > > > > separates and only collect them in the CoGroup function before givi= ng > > > them > > > > into the VertexComputeFunction. Be careful, to only do that with > > > > objectReuse disabled or take care to properly copy the messages. If > you > > > > collect the messages in the CoGroup, you don't need the GroupReduce= , > > have > > > > smaller records and you can remove the MessageIterator class > > completely. > > > > > > > > > > =E2=80=8BI see. The idea was to expose to message combiner that user = could > > > =E2=80=8Bimplement if the messages are combinable, e.g. min, sum. Thi= s is a > > common > > > case and reduces the message load significantly. Is there a way I cou= ld > > do > > > something similar before the coGroup? > > > > > > > > > > > > > 2) Add this annotation to the AppendVertexState function: > > > > @ForwardedFieldsFirst("*->f0"). This indicates that the complete > > element > > > of > > > > the first input becomes the first field of the output. Since the > input > > is > > > > partitioned on "f0" (it comes out of the partitioned solution set) > the > > > > result of ApplyVertexState will be partitioned on "f0.f0" which is > > > > (accidentially :-D) the join key of the following coGroup function = -> > > no > > > > partitioning :-) > > > > > > > > > > =E2=80=8BGreat! I totally missed that ;)=E2=80=8B > > > > > > > > > > > > > 3) Adding the two flatMap functions behind the CoGroup prevents > > chaining > > > > and causes therefore some serialization overhead but shouldn't be t= oo > > > bad. > > > > > > > > So in total I would make this program as follows: > > > > > > > > iVertices > > > > iMessage =3D iVertices.map(new InitWorkSet()); > > > > > > > > iteration =3D iVertices.iterateDelta(iMessages, maxIt, 0) > > > > verticesWithMessage =3D iteration.getSolutionSet() > > > > .join(iteration.workSet()) > > > > .where(0) // solution set is local and build side > > > > .equalTo(0) // workset is shuffled and probe side of hashjoin > > > > superstepComp,Bool> =3D > > > > verticesWithMessage.coGroup(edgessWithValue) > > > > .where("f0.f0") // vwm is locally forward and sorted > > > > .equalTo(0) // edges are already partitioned and sorted (if cach= ed > > > > correctly) > > > > .with(...) // The coGroup collects all messages in a collection a= nd > > > gives > > > > it to the ComputeFunction > > > > delta =3D superStepComp.flatMap(...) // partitioned when me= rged > > > into > > > > solution set > > > > workSet =3D superStepComp.flatMap(...) // partitioned f= or > > join > > > > iteration.closeWith(delta, workSet) > > > > > > > > So, if I am correct, the program will > > > > - partition the workset > > > > - sort the vertices with messages > > > > - partition the delta > > > > > > > > One observation I have is that this program requires that all > messages > > > fit > > > > into memory. Was that also the case before? > > > > > > > > > > =E2=80=8BI believe not. The plan has one coGroup that produces the me= ssages > and a > > > following coGroup that groups by the messages "target ID" and consume= s > > > them=E2=80=8B in an iterator. That doesn't require them to fit in mem= ory, > right? > > > > > > > > > =E2=80=8BI'm also working on a version where the graph is represented= as an > > > adjacency list, instead of two separate datasets of vertices and edge= s. > > The > > > disadvantage is that the graph has to fit in memory, but I think the > > > advantages are many=E2=80=8B. We'll be able to support edge value upd= ates, edge > > > mutations and different edge access order guarantees. I'll get back t= o > > this > > > thread when I have a working prototype. > > > > > > > > > > > > > > Cheers, > > > > Fabian > > > > > > > > > > =E2=80=8BThanks again! > > > > > > Cheers, > > > -Vasia. > > > =E2=80=8B > > > > > > > > > > > > > > > > > > 2015-10-27 19:10 GMT+01:00 Vasiliki Kalavri < > vasilikikalavri@gmail.com > > >: > > > > > > > > > @Martin: thanks for your input! If you ran into any other issues > > that I > > > > > didn't mention, please let us know. Obviously, even with my > proposal, > > > > there > > > > > are still features we cannot support, e.g. updating edge values a= nd > > > graph > > > > > mutations. We'll need to re-think the underlying iteration and/or > > graph > > > > > representation for those. > > > > > > > > > > @Fabian: thanks a lot, no rush :) > > > > > Let me give you some more information that might make it easier t= o > > > reason > > > > > about performance: > > > > > > > > > > Currently, in Spargel the SolutionSet (SS) keeps the vertex state > and > > > the > > > > > workset (WS) keeps the active vertices. The iteration is composed > of > > 2 > > > > > coGroups. The first one takes the WS and the edges and produces > > > messages. > > > > > The second one takes the messages and the SS and produced the new > WS > > > and > > > > > the SS-delta. > > > > > > > > > > In my proposal, the SS has the vertex state and the WS has > > > > > MessageIterator> pairs, i.e. the inbox of each vertex. The plan i= s > > more > > > > > complicated because compute() needs to have two iterators: over t= he > > > edges > > > > > and over the messages. > > > > > First, I join SS and WS to get the active vertices (have received= a > > > msg) > > > > > and their current state. Then I coGroup the result with the edges > to > > > > access > > > > > the neighbors. Now the main problem is that this coGroup needs to > > have > > > 2 > > > > > outputs: the new messages and the new vertex value. I couldn't > really > > > > find > > > > > a nice way to do this, so I'm emitting a Tuple that contains both > > types > > > > and > > > > > I have a flag to separate them later with 2 flatMaps. From the > vertex > > > > > flatMap, I crete the SS-delta and from the messaged flatMap I > apply a > > > > > reduce to group the messages by vertex and send them to the new W= S. > > One > > > > > optimization would be to expose a combiner here to reduce message > > size. > > > > > > > > > > tl;dr: > > > > > 1. 2 coGroups vs. Join + coGroup + flatMap + reduce > > > > > 2. how can we efficiently emit 2 different types of records from = a > > > > coGroup? > > > > > 3. does it make any difference if we group/combine the messages > > before > > > > > updating the workset or after? > > > > > > > > > > Cheers, > > > > > -Vasia. > > > > > > > > > > > > > > > On 27 October 2015 at 18:39, Fabian Hueske > > wrote: > > > > > > > > > > > I'll try to have a look at the proposal from a performance poin= t > of > > > > view > > > > > in > > > > > > the next days. > > > > > > Please ping me, if I don't follow up this thread. > > > > > > > > > > > > Cheers, Fabian > > > > > > > > > > > > 2015-10-27 18:28 GMT+01:00 Martin Junghanns < > > m.junghanns@mailbox.org > > > >: > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > At our group, we also moved several algorithms from Giraph to > > Gelly > > > > and > > > > > > > ran into some confusing issues (first in understanding, secon= d > > > during > > > > > > > implementation) caused by the conceptional differences you > > > described. > > > > > > > > > > > > > > If there are no concrete advantages (performance mainly) in t= he > > > > Spargel > > > > > > > implementation, we would be very happy to see the Gelly API b= e > > > > aligned > > > > > to > > > > > > > Pregel-like systems. > > > > > > > > > > > > > > Your SSSP example speaks for itself. Straightforward, if the > > reader > > > > is > > > > > > > familiar with Pregel/Giraph/... > > > > > > > > > > > > > > Best, > > > > > > > Martin > > > > > > > > > > > > > > > > > > > > > On 27.10.2015 17:40, Vasiliki Kalavri wrote: > > > > > > > > > > > > > >> Hello squirrels, > > > > > > >> > > > > > > >> I want to discuss with you a few concerns I have about our > > current > > > > > > >> vertex-centric model implementation, Spargel, now fully > subsumed > > > by > > > > > > Gelly. > > > > > > >> > > > > > > >> Spargel is our implementation of Pregel [1], but it violates > > some > > > > > > >> fundamental properties of the model, as described in the pap= er > > and > > > > as > > > > > > >> implemented in e.g. Giraph, GPS, Hama. I often find myself > > > confused > > > > > both > > > > > > >> when trying to explain it to current Giraph users and when > > porting > > > > my > > > > > > >> Giraph algorithms to it. > > > > > > >> > > > > > > >> More specifically: > > > > > > >> - in the Pregel model, messages produced in superstep n, are > > > > received > > > > > in > > > > > > >> superstep n+1. In Spargel, they are produced and consumed in > the > > > > same > > > > > > >> iteration. > > > > > > >> - in Pregel, vertices are active during a superstep, if they > > have > > > > > > received > > > > > > >> a message in the previous superstep. In Spargel, a vertex is > > > active > > > > > > during > > > > > > >> a superstep if it has changed its value. > > > > > > >> > > > > > > >> These two differences require a lot of rethinking when porti= ng > > > > > > >> applications > > > > > > >> and can easily cause bugs. > > > > > > >> > > > > > > >> The most important problem however is that we require the us= er > > to > > > > > split > > > > > > >> the > > > > > > >> computation in 2 phases (2 UDFs): > > > > > > >> - messaging: has access to the vertex state and can produce > > > messages > > > > > > >> - update: has access to incoming messages and can update the > > > vertex > > > > > > value > > > > > > >> > > > > > > >> Pregel/Giraph only expose one UDF to the user: > > > > > > >> - compute: has access to both the vertex state and the > incoming > > > > > > messages, > > > > > > >> can produce messages and update the vertex value. > > > > > > >> > > > > > > >> This might not seem like a big deal, but except from forcing > the > > > > user > > > > > to > > > > > > >> split their program logic into 2 phases, Spargel also makes > some > > > > > common > > > > > > >> computation patterns non-intuitive or impossible to write. A > > very > > > > > simple > > > > > > >> example is propagating a message based on its value or sende= r > > ID. > > > To > > > > > do > > > > > > >> this with Spargel, one has to store all the incoming message= s > in > > > the > > > > > > >> vertex > > > > > > >> value (might be of different type btw) during the messaging > > phase, > > > > so > > > > > > that > > > > > > >> they can be accessed during the update phase. > > > > > > >> > > > > > > >> So, my first question is, when implementing Spargel, were > other > > > > > > >> alternatives considered and maybe rejected in favor of > > performance > > > > or > > > > > > >> because of some other reason? If someone knows, I would love > to > > > hear > > > > > > about > > > > > > >> them! > > > > > > >> > > > > > > >> Second, I wrote a prototype implementation [2] that only > exposes > > > one > > > > > > UDF, > > > > > > >> compute(), by keeping the vertex state in the solution set a= nd > > the > > > > > > >> messages > > > > > > >> in the workset. This way all previously mentioned limitation= s > go > > > > away > > > > > > and > > > > > > >> the API (see "SSSPComputeFunction" in the example [3]) looks= a > > lot > > > > > more > > > > > > >> like Giraph (see [4]). > > > > > > >> > > > > > > >> I have not run any experiments yet and the prototype has som= e > > ugly > > > > > > hacks, > > > > > > >> but if you think any of this makes sense, then I'd be willin= g > to > > > > > follow > > > > > > up > > > > > > >> and try to optimize it. If we see that it performs well, we > can > > > > > consider > > > > > > >> either replacing Spargel or adding it as an alternative. > > > > > > >> > > > > > > >> Thanks for reading this long e-mail and looking forward to > your > > > > input! > > > > > > >> > > > > > > >> Cheers, > > > > > > >> -Vasia. > > > > > > >> > > > > > > >> [1]: https://kowshik.github.io/JPregel/pregel_paper.pdf > > > > > > >> [2]: > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly= /src/main/java/org/apache/flink/graph/spargelnew > > > > > > >> [3]: > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly= /src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java > > > > > > >> [4]: > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/ok= api/graphs/SingleSourceShortestPaths.java > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > --001a11c24000ecddc405241c4d9d--