flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vasiliki Kalavri <vasilikikala...@gmail.com>
Subject Re: Questions about the V-C Iteration in Gelly
Date Tue, 14 Feb 2017 16:12:21 GMT
Dear Xingcan,

no need to apologize, we are here to help :) You are always welcome to ask
questions / make suggestions.

Cheers,
-Vasia.

On 14 February 2017 at 09:35, Xingcan Cui <xingcanc@gmail.com> wrote:

> Hi Vasia,
>
> sorry that I should have read the archive before (it's already been posted
> in FLINK-1526, though with an ugly format). Now everything's clear and I
> think this thread should be closed here.
>
> Thanks. @Vasia @Greg
>
> Best,
> Xingcan
>
> On Tue, Feb 14, 2017 at 3:55 PM, Vasiliki Kalavri <
> vasilikikalavri@gmail.com> wrote:
>
>> Hi Xingcan,
>>
>> that's my bad, I was thinking of scatter-gather iterations in my previous
>> reply. You're right, in VertexCentricIteration a vertex is only active in
>> the next superstep if it has received at least one message in the current
>> superstep. Updating its value does not impact the activation. This is
>> intentional in the vertex-centric model.
>>
>> I agree that the current design of the iterative models is restrictive
>> and doesn't allow for the expression of complex iterative algorithms that
>> require updating edges or defining phases. We have discussed this before,
>> e.g. in [1]. The outcome of that discussion was that we should use for-loop
>> iterations for such cases, as the closed-loop iteration operators of Flink
>> might not provide the necessary flexibility. As you will see in the thread
>> though, that proposal didn't work out, as efficiently supporting for-loops
>> in Flink is not possible right now.
>>
>> -Vasia.
>>
>> [1]: http://apache-flink-mailing-list-archive.1008284.n3.
>> nabble.com/DISCUSS-Gelly-iteration-abstractions-td3949.html
>>
>> On 14 February 2017 at 08:10, Xingcan Cui <xingcanc@gmail.com> wrote:
>>
>>> Hi Greg,
>>>
>>> I also found that in VertexCentricIteration.java, the message set is
>>> taken as the workset while the vertex set is taken as the delta for
>>> solution set. By doing like that, the setNewVertex method will not actually
>>> active a vertex. In other words, if no message is generated (the workset is
>>> empty) the "pact.runtime.workset-empty-aggregator" will judge
>>> convergence of the delta iteration and then the iteration just terminates.
>>> Is this a bug?
>>>
>>> Best,
>>> Xingcan
>>>
>>>
>>> On Mon, Feb 13, 2017 at 5:24 PM, Xingcan Cui <xingcanc@gmail.com> wrote:
>>>
>>>> Hi Greg,
>>>>
>>>> Thanks for your attention.
>>>>
>>>> It takes me a little time to read the old PR on FLINK-1885. Though
>>>> the VertexCentricIteration, as well as its related classes, has been
>>>> refactored, I understand what Markus want to achieve.
>>>>
>>>> I am not sure if using a bulk iteration instead of a delta one could
>>>> eliminate the "out of memory" problem.  Except for that, I think the "auto
>>>> update" has nothing to do with the bulk mode. Considering the compatible
>>>> guarantee, here is my suggestions to improve gelly's iteration API:
>>>>
>>>> 1) Add an "autoHalt" flag to the ComputeFunction.
>>>>
>>>> 2) If the flag is set true (default), apply the current mechanism .
>>>>
>>>> 3) If the flag is set false, call out.collect() to update the vertex
>>>> value whether the setNewVertexValue() method is called or not, unless the
>>>> user explicitly call a (new added) voteToHalt() method in the
>>>> ComputeFunction.
>>>>
>>>> By adding these, users can decide when to halt a vertex themselves.
>>>> What do you think?
>>>>
>>>> As for the "update edge values during vertex iterations" problem, I
>>>> think it needs a redesign for the gelly framework (Maybe merge the vertices
>>>> and edges into a single data set? Or just change the iterations'
>>>>  implementation? I can't think it clearly now.), so that's it for now.
>>>> Besides, I don't think there will be someone who really would love to write
>>>> a graph algorithm with Flink native operators and that's why gelly is
>>>> designed, isn't it?
>>>>
>>>> Best,
>>>> Xingcan
>>>>
>>>> On Fri, Feb 10, 2017 at 10:31 PM, Greg Hogan <code@greghogan.com>
>>>> wrote:
>>>>
>>>>> Hi Xingcan,
>>>>>
>>>>> FLINK-1885 looked into adding a bulk mode to Gelly's iterative models.
>>>>>
>>>>> As an alternative you could implement your algorithm with Flink
>>>>> operators and a bulk iteration. Most of the Gelly library is written
with
>>>>> native operators.
>>>>>
>>>>> Greg
>>>>>
>>>>> On Fri, Feb 10, 2017 at 5:02 AM, Xingcan Cui <xingcanc@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Vasia,
>>>>>>
>>>>>> b) As I said, when some vertices finished their work in current
>>>>>> phase, they have nothing to do (no value updates, no message received,
just
>>>>>> like slept) but to wait for other vertices that have not finished
(the
>>>>>> current phase) yet. After that in the next phase, all the vertices
should
>>>>>> go back to work again and if there are some vertices become inactive
in
>>>>>> last phase, it could be hard to reactive them again by message since
we
>>>>>> even don't know which vertices to send to. The only solution is to
keep all
>>>>>> vertices active, whether by updating vertices values in each super
step or
>>>>>> sending heartbeat messages to vertices themselves (which will bring
a lot
>>>>>> of extra work to the MessageCombiner).
>>>>>>
>>>>>> c) I know it's not elegant or even an awful idea to store the edge
>>>>>> info into vertex values. However, we can not change edge values or
maintain
>>>>>> states (even a pick or unpick mark) in edges during a vertex-centric
>>>>>> iteration. Then what can we do if an algorithm really need that?
>>>>>>
>>>>>> Thanks for your patience.
>>>>>>
>>>>>> Best,
>>>>>> Xingcan
>>>>>>
>>>>>> On Fri, Feb 10, 2017 at 4:50 PM, Vasiliki Kalavri <
>>>>>> vasilikikalavri@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Xingcan,
>>>>>>>
>>>>>>> On 9 February 2017 at 18:16, Xingcan Cui <xingcanc@gmail.com>
wrote:
>>>>>>>
>>>>>>>> Hi Vasia,
>>>>>>>>
>>>>>>>> thanks for your reply. It helped a lot and I got some new
ideas.
>>>>>>>>
>>>>>>>> a) As you said, I did use the getPreviousIterationAggregate()
>>>>>>>> method in preSuperstep() of the next superstep.
>>>>>>>> However, if the (only?) global (aggregate) results can not
be guaranteed
>>>>>>>> to be consistency,  what should we
>>>>>>>> do with the postSuperstep() method?
>>>>>>>>
>>>>>>>
>>>>>>> ​The postSuperstep() method is analogous to the close() method
in a
>>>>>>> RichFunction, which is typically used for cleanup.​
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>> b) Though we can active vertices by update method or messages,
IMO,
>>>>>>>> it may be more proper for users
>>>>>>>> themselves to decide when to halt a vertex's iteration. Considering
>>>>>>>> a complex algorithm that contains different
>>>>>>>> phases inside a vertex-centric iteration. Before moving to
the next
>>>>>>>> phase (that should be synchronized),
>>>>>>>> there may be some vertices that already finished their work
in
>>>>>>>> current phase and they just wait for others.
>>>>>>>> Users may choose the finished vertices to idle until the
next
>>>>>>>> phase, but rather than to halt them.
>>>>>>>> Can we consider adding the voteToHalt() method and some internal
>>>>>>>> variables to the Vertex/Edge class
>>>>>>>> (or just create an "advanced" version of them) to make the
halting
>>>>>>>> more controllable?
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> ​I suppose adding a voteToHalt() method is possible, but I'm
not
>>>>>>> sure I see how that would make halting more controllable. If
a vertex
>>>>>>> hasn't changed value or hasn't received a message, it has no
work to do in
>>>>>>> the next iteration, so why keep it active? If in a later superstep,
a
>>>>>>> previously inactive vertex receives a message, it will become
active again.
>>>>>>> ​Is this what you're looking for or am I missing something?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>> c) Sorry that I didn't make it clear before. Here the
>>>>>>>> initialization means a "global" one that executes once
>>>>>>>> before the iteration. For example, users may want to initialize
>>>>>>>> the vertices' values by their adjacent edges
>>>>>>>> before the iteration starts. Maybe we can add an extra
>>>>>>>> coGroupFunction to the configuration parameters
>>>>>>>> and apply it before the iteration?
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> ​You can initialize the graph by using any Gelly transformation
>>>>>>> methods before starting the iteration, e.g. mapVertices, mapEdges,
>>>>>>> reduceOnEdges, etc.
>>>>>>> Btw, a vertex can iterate over its edges inside the ComputeFunction
>>>>>>> using the getEdges() method. Initializing the vertex values with
>>>>>>> neighboring edges might not be a good idea if you have vertices
with high
>>>>>>> degrees.​
>>>>>>>
>>>>>>>
>>>>>>> ​Cheers,
>>>>>>> -Vasia.​
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>> What do you think?
>>>>>>>>
>>>>>>>> (BTW, I started a PR on FLINK-1526(MST Lib&Example).
Considering
>>>>>>>> the complexity, the example is not
>>>>>>>> provided.)
>>>>>>>>
>>>>>>>> Really appreciate for all your help.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Xingcan
>>>>>>>>
>>>>>>>> On Thu, Feb 9, 2017 at 5:36 PM, Vasiliki Kalavri <
>>>>>>>> vasilikikalavri@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Xingcan,
>>>>>>>>>
>>>>>>>>> On 7 February 2017 at 10:10, Xingcan Cui <xingcanc@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi all,
>>>>>>>>>>
>>>>>>>>>> I got some question about the vertex-centric iteration
in Gelly.
>>>>>>>>>>
>>>>>>>>>> a)  It seems the postSuperstep method is called before
the
>>>>>>>>>> superstep barrier (I got different aggregate values
of the same superstep
>>>>>>>>>> in this method). Is this a bug? Or the design is
just like that?
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> ​The postSuperstep() method is called inside the close()
method of
>>>>>>>>> a RichCoGroupFunction that wraps the ComputeFunction.
The close()
>>>>>>>>> method It is called after the last call to the coGroup()
after
>>>>>>>>> each iteration superstep.
>>>>>>>>> The aggregate values are not guaranteed to be consistent
during
>>>>>>>>> the same superstep when they are computed. To retrieve
an aggregate value
>>>>>>>>> for superstep i, you should use the getPreviousIterationAggregate()
>>>>>>>>> method in superstep i+1.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> b) There is not setHalt method for vertices. When
no message
>>>>>>>>>> received, a vertex just quit the next iteration.
Should I manually send
>>>>>>>>>> messages (like heartbeat) to keep the vertices active?
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> ​That's because vertex halting is implicitly controlled
by the
>>>>>>>>> underlying delta iterations of Flink. ​A vertex will
remain active as long
>>>>>>>>> as it receives a message or it updates its value, otherwise
it will become
>>>>>>>>> inactive. The documentation on Gelly iterations [1] and
DataSet iterations
>>>>>>>>> [2] might be helpful.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> c) I think we may need an initialization method in
the
>>>>>>>>>> ComputeFunction.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> ​There exists a preSuperstep() method for initialization.
This one
>>>>>>>>> will be executed once per superstep before the compute
function is invoked
>>>>>>>>> for every vertex. Would this work for you?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Any opinions? Thanks.
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Xingcan
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>> ​I hope this helps,
>>>>>>>>> -Vasia.​
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> ​[1]: https://ci.apache.org/projects/flink/flink-docs-release-1.2/
>>>>>>>>> dev/libs/gelly/iterative_graph_processing.html#vertex-centri
>>>>>>>>> c-iterations
>>>>>>>>> [2]: https://ci.apache.org/projects/flink/flink-docs-release-1.2/
>>>>>>>>> dev/batch/iterations.html​
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message