ignite-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andrey Kornev <andrewkor...@hotmail.com>
Subject RE: Computation on NodeEntries
Date Tue, 15 Dec 2015 01:29:22 GMT
Dmitriy, thanks!

The issue here is not related to the state of the partitions, but to the fact that the launch
of the local queries on individual nodes is not atomic with respect to the cluster topology
in any way, which makes the race described by Denis possible.

I'm looking for a correct way to handle this situation.


From: dsetrakyan@apache.org
Date: Mon, 14 Dec 2015 16:03:50 -0800
Subject: Re: Computation on NodeEntries
To: user@ignite.apache.org

On Mon, Dec 14, 2015 at 3:16 PM, Andrey Kornev <andrewkornev@hotmail.com> wrote:

Hey Denis,

Thanks for your reply! The race you've described is definitely possible.

However, it seems SqlQuery would also be vulnerable to the same race condition. I broadcast
the tasks to all nodes and while each task is trying to start a local SqlQuery on the node
a new node joins the cluster. Wouldn't I run into the same issue? 

Andrey, SqlQuery or ScanQuery should guarantee that you are running on a locked partitions
which are in a complete state. If a partition has not been filled yet, e.g. due to a node
just joining, then another partition, which has full state on another node, will be picked
for a query. 
I'd like emphasize I'm not talking about a single node launching a query against the cluster.
I'd like to process my data on each node in parallel, but only the data that matches a certain
SQL query.


Subject: Re: Computation on NodeEntries
To: user@ignite.apache.org
From: dmagda@gridgain.com
Date: Mon, 14 Dec 2015 10:53:04 +0300

    Hi Andrey,


    Please see below.


    On 12/13/2015 9:08 AM, Andrey Kornev



        If partions do not migrate while they are being iterated
          over, wouldn't it then suffice to simply execute a single
          ScanQuery with its isLocal set to true? My reasoning here is
          that the scan would create an iterator for all affinity
          partitions, thus preventing their migration. If it's not the
          case, how would then a local ScanQuery behave in presence of
          topology changes?
    From what I see in the code setting ScanQuery's isLocal to 'true'
    gives an ability to iterate over all the partitions that belonged to
    a node at the time the query is started. All the partitions won't be
    moved to another node until the querys's iterator is closed.


    However, here I see the following issue. Imagine that your cluster
    has two nodes and you decided to iterate over all local partitions
    of two nodes and the execution sequence looks like this:


    1) ScanQuery with isLocal=true started executing on node A.
    All the partitions are blocked and won't be moved.

    2) Node B receives the same compute job with the ScanQuery
    in it. However because of an OS scheduler a Thread that is in charge
    of starting the query is blocked for some time. So the iterator over
    local partitions is not ready yet and the partitions are not

    3) Third node C joins the topology. Partitions that are
    owned by Node B may be rebalanced among node A and
    node C. 

    4) Partitions that are rebalanced from node B to node A
    won't be visited by your code because node's A iterator is already
    built while node's B iterator is constructed after the rebalancing.


    The issue can't happen when you specify partitions explicitly using
    Yakov's approach below. Because in the worst case in the situation
    like above a just rebalanced partition's data will be uploaded to a
    node that was initially an owner of the partition (at the time when
    you calculated partitions owners).




        Also, what's the best way to handle topology changes while
          using the SqlQuery rather than ScanQuery? Basically, it's the
          same use case, only instead of scanning the entire partition
          I'd like to first filter the cache entries using a query.

    SqlQueries will work transparently for you and guarantee to return a
    full and consistent result set even if a topology is changed while a
    query is in progress.





        From: Yakov Zhdanov <yzhdanov@apache.org>

        Sent: Friday, December 11, 2015 10:55 AM

        Subject: RE: Computation on NodeEntries

        To: <user@ignite.apache.org>



        Partition will not migrate if local or remote
          iterator is not finished/closed.
         On Dec 11, 2015 21:05, "Andrey Kornev"
          < andrewkornev@hotmail.com>

               Great suggestion! Thank you, Yakov! 


                Just one more question. :) Let's say the scan job is
                running node A and processing partition 42. At the same
                time, a new node B joins and partition 42 needs to be
                moved to this node. What will happen to my scan query
                that is still running on node A and iterating over the
                partition's entries? Would it complete processing the
                entire partition despite the change of ownership? Or,
                would the query terminate at some arbitrary point once
                the partition ownership transfer has completed? 


                Thanks a lot! 



                  Date: Fri, 11 Dec 2015 16:06:16 +0300 

                  Subject: Re: Computation on NodeEntries 

                  From: yzhdanov@apache.org

                  To: user@ignite.apache.org


                   Guys, I would do the following:

                     1. Map all my partitions to
                      nodes: org.apache.ignite.cache.affinity.Affinity#mapPartitionsToNodes
                     2. Send jobs (with its list of partitions) to
                      each node using map returned on step1 
                     3. Job may be like: 
                      new Runnable() {
    @Override public void run() {
        for (Integer part : parts) {
            Iterator<Cache.Entry<Object, Object>> it = cache.query(new ScanQuery<>(part)).iterator();
            // do the stuff...

                      This may result in network calls for some worst cases when topology
changes under your feet, but even in this case this should work.


                       2015-12-11 2:13 GMT+03:00 Andrey Kornev <andrewkornev@hotmail.com>:



                              Given the approach you suggested below,
                              what would be your recommendation for
                              dealing with cluster topology changes
                              while the iteration is in progress? An
                              obvious one I can think of is to 

                              - somehow detect the change, 

                              - cancel the tasks on all the nodes 

                              - wait until the rebalancing is finished

                              - restart the computation. 


                              Are there any other ways? Ideally, I'd
                              like to have the "exactly-once" execution







View raw message