ignite-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Roman Kondakov <kondako...@mail.ru.INVALID>
Subject Re: Historical rebalance
Date Mon, 03 Dec 2018 11:22:07 GMT
Vladimir,

the difference between per-transaction basis and update-counters basis 
is the fact that at the first update we don't know the actual update 
counter of this update - we just count deltas on enlist phase. Actual 
update counter of this update will be assigned on transaction commit. 
But for per-transaction based the actual HWM is known for each 
transaction from the very beginning and this value is the same for 
primary and backups. Having this number it is very easy to find where 
transaction begins on any node.


-- 
Kind Regards
Roman Kondakov

On 03.12.2018 13:46, Vladimir Ozerov wrote:
> Roman,
>
> We already track updates on per-transaction basis. The only difference is
> that instead of doing a single "increment(1)" for transaction we do
> "increment(X)" where X is number of updates in the given transaction.
>
> On Mon, Dec 3, 2018 at 1:16 PM Roman Kondakov <kondakov87@mail.ru.invalid>
> wrote:
>
>> Igor, Vladimir, Ivan,
>>
>> perhaps, we are focused too much on update counters. This feature was
>> designed for the continuous queries and it may not be suited well for
>> the historical rebalance. What if we would track updates on
>> per-transaction basis instead of per-update basis? Let's consider two
>> counters: low-water mark (LWM) and high-water mark (HWM) which should be
>> added to each partition. They have the following properties:
>>
>> * HWM - is a plane atomic counter. When Tx makes its first write on
>> primary node it does incrementAndGet for this counter and remembers
>> obtained value within its context. This counter can be considered as tx
>> id within current partition - transactions should maintain per-partition
>> map of their HWM ids. WAL pointer to the first record should remembered
>> in this map. Also this id should be recorded to WAL data records.
>>
>> When Tx sends updates to backups it sends Tx HWM too. When backup
>> receives this message from the primary node it takes HWM and do
>> setIfGreater on the local HWM counter.
>>
>> * LWM - is a plane atomic counter. When Tx terminates (either with
>> commit or rollback) it updates its local LWM in the same manner as
>> update counters do it using holes tracking. For example, if partition's
>> LWM = 10 now, and tx with id (HWM id) = 12 commits, we do not update
>> partition LWM until tx with id = 11 is committed. When id = 11 is
>> committed, LWM is set to 12. If we have LWM == N, this means that all
>> transactions with id <= N have been terminated for the current partition
>> and all data is already recorded in the local partition.
>>
>> Brief summary for both counters: HWM - means that partition has already
>> seen at least one update of transactions with id <= HWM. LWM means that
>> partition has all updates made by transactions wth id <= LWM.
>>
>> LWM is always <= HWM.
>>
>> On checkpoint we should store only these two counters in checkpoint
>> record. As optimization we can also store list of pending LWMs - ids
>> which haven't been merged to LWM because of the holes in sequence.
>>
>> Historical rebalance:
>>
>> 1. Demander knows its LWM - all updates before it has been applied.
>> Demander sends LWM to supplier.
>>
>> 2. Supplier finds the earliest checkpoint where HWM(supplier) <= LWM
>> (demander)
>>
>> 3. Supplier starts moving forward on WAL until it finds first data
>> record with HWM id = LWM (demander). From this point WAL can be
>> rebalanced to demander.
>>
>> In this approach updates and checkpoints on primary and backup can be
>> reordered in any way, but we can always find a proper point to read WAL
>> from.
>>
>> Let's consider a couple of examples. In this examples transaction
>> updates marked as w1(a) - transaction 1 updates key=a, c1 - transaction
>> 1 is committed, cp(1, 0) - checkpoint with HWM=1 and  LWM=0. (HWM,LWM) -
>> current counters after operation. (HWM,LWM[hole1, hole2]) - counters
>> with holes in LWM.
>>
>>
>> 1. Simple case with no reordering:
>>
>> PRIMARY
>> -----w1(a)---cp(1,0)---w2(b)----w1(c)--------------c1--------c2-----cp(2,2)
>> (HWM,LWM)    (1,0)             (2,0)    (2,0)             (2,1)     (2,2)
>>                 |                  |        |                 |        |
>> BACKUP
>> ------w1(a)-------------w2(b)----w1(c)---cp(2,0)----c1--------c2-----cp(2,2)
>> (HWM,LWM)    (1,0)             (2,0)    (2,0)             (2,1)     (2,2)
>>
>>
>> In this case if backup failed before c1 it will receive all updates from
>> the beginning (HWM=0).
>> If it fails between c1 and c2, it will receive WAL from primary's cp(1,0),
>> because tx with id=1 is fully processed on backup: HWM(supplier cp(1,0))=1
>> == LWM(demander)=1
>> if backup fails after c2, it will receive nothing because it has all
>> updates HWM(supplier)=2 == LWM(demander)=2
>>
>>
>>
>> 2. Case with reordering
>>
>> PRIMARY
>> -----w1(a)---cp(1,0)---w2(b)------cp(2,0)----------w1(c)------c1-----c2-----------cp(2,2)
>> (HWM,LWM)    (1,0)             (2,0)                       (2,0)
>>   (2,1)  (2,2)
>>                   \_____           |                           |
>> \   |
>>                         \_______   |                           |
>>   \__|_______
>>                                 \__|______                     |
>>    |       \
>>                                    |      \                    |
>>    |        \
>> BACKUP
>> -------------------------w2(b)---w1(a)----cp(2,0)---w1(c)------------c2-------c1-----cp(2,2)
>> (HWM,LWM)                       (2,0)   (2,0)              (2,0)
>> (2,0[2])  (2,2)
>>
>>
>> Note here we have a hole on backup when tx2 has committed earlier than tx1
>> and LWM wasn't changed at this moment.
>>
>> In last case if backup is failed before c1, the entire WAL will be
>> supplied because LWM=0 until this moment.
>> If backup fails after c1 - there is nothing to rebalance, because
>> HWM(supplier)=2 == LWM(demander)=2
>>
>>
>> What do you think?
>>
>>
>> --
>> Kind Regards
>> Roman Kondakov
>>
>> On 30.11.2018 2:01, Seliverstov Igor wrote:
>>> Vladimir,
>>>
>>> Look at my example:
>>>
>>> One active transaction (Tx1 which does opX ops) while another tx (Tx2
>> which
>>> does opX' ops) is finishes with uc4:
>>>
>>>
>> ----uc1--op1----op2---uc2--op1'----uc3--uc4---op3------------X-------------
>>> Node1
>>>
>>>
>>>
>>> ----uc1----op1----uc2----op2----uc3--op3----------uc4----cp1----     Tx1
>> -
>>>                               ^         |                  |
>>>                                  |
>>>                                ------------------------
>>>                               | -Node2
>>>                                                             ^------
>>>                                  |
>>>                                                                     |
>>>                                    |
>>> ----uc1-------------uc2-------------uc3--------op1'----uc4----cp1----
>>> Tx2 -
>>>
>>>
>>> state on Node2: tx1 -> op3 -> uc2
>>>                             cp1 [current=uc4, backpointer=uc2]
>>>
>>> Here op2 was acknowledged by op3, op3 was applied before op1' (linearized
>>> by WAL).
>>>
>>> All nodes having uc4 must have op1' because uc4 cannot be get earlier
>> than
>>> prepare stage while prepare stage happens after all updates so *op1'
>>> happens before uc4* regardless Tx2 was committed or rolled back.
>>>
>>> This means *op2 happens before uc4* (uc4 cannot be earlier op2 on any
>> node
>>> because on Node2 op2 was already finished (acknowledged by op3) when op1'
>>> happens)
>>>
>>> That was my idea which easy to proof.
>>>
>>> You used a different approach, but yes, It has to work.
>>>
>>> чт, 29 нояб. 2018 г. в 22:19, Vladimir Ozerov <vozerov@gridgain.com>:
>>>
>>>> "If more recent WAL records will contain *ALL* updates of the
>> transaction"
>>>> -> "More recent WAL records will contain *ALL* updates of the
>> transaction"
>>>> On Thu, Nov 29, 2018 at 10:15 PM Vladimir Ozerov <vozerov@gridgain.com>
>>>> wrote:
>>>>
>>>>> Igor,
>>>>>
>>>>> Yes, I tried to draw different configurations, and it really seems to
>>>>> work, despite of being very hard to proof due to non-inituitive HB
>> edges.
>>>>> So let me try to spell the algorithm once again to make sure that we
>> are
>>>> on
>>>>> the same page here.
>>>>>
>>>>> 1) There are two nodes - primary (P) and backup (B)
>>>>> 2) There are three type of events: small transactions which possibly
>>>>> increments update counter (ucX), one long active transaction which is
>>>> split
>>>>> into multiple operations (opX), and checkpoints (cpX)
>>>>> 3) Every node always has current update counter. When transaction
>> commits
>>>>> it may or may not shift this counter further depending on whether there
>>>> are
>>>>> holes behind. But we have a strict rule that it always grow. Higher
>>>>> coutners synchrnoizes with smaller. Possible cases:
>>>>> ----uc1----uc2----uc3----
>>>>> ----uc1--------uc3------- // uc2 missing due to reorder, but is is ok
>>>>>
>>>>> 4) Operations within a single transaction is always applied
>> sequentially,
>>>>> and hence also have HB edge:
>>>>> ----op1----op2----op3----
>>>>>
>>>>> 5) When transaction operation happens, we save in memory current update
>>>>> counter available at this moment. I.e. we have a map from transaction
>> ID
>>>> to
>>>>> update counter which was relevant by the time last *completed*
>> operation
>>>>> *started*. This is very important thing - we remember the counter when
>>>>> operation starts, but update the map only when it finishes. This is
>>>> needed
>>>>> for situation when update counter is bumber in the middle of a long
>>>>> operation.
>>>>> ----uc1----op1----op2----uc2----uc3----op3----
>>>>>               |      |                    |
>>>>>              uc1    uc1                  uc3
>>>>>
>>>>> state: tx1 -> op3 -> uc3
>>>>>
>>>>> 6) Whenever checkpoint occurs, we save two counters with: "current" and
>>>>> "backpointer". The latter is the smallest update counter associated
>> with
>>>>> active transactions. If there are no active transactions, current
>> update
>>>>> counter is used.
>>>>>
>>>>> Example 1: no active transactions.
>>>>> ----uc1----cp1----
>>>>>        ^      |
>>>>>        --------
>>>>>
>>>>> state: cp1 [current=uc1, backpointer=uc1]
>>>>>
>>>>> Example 2: one active transaction:
>>>>>                                    ---------------
>>>>>                                    |             |
>>>>> ----uc1----op1----uc2----op2----op3----uc3----cp1----
>>>>>                      ^             |
>>>>>                      --------------
>>>>>
>>>>> state: tx1 -> op3 -> uc2
>>>>>          cp1 [current=uc3, backpointer=uc2]
>>>>>
>>>>> 7) Historical rebalance:
>>>>> 7.1) Demander finds latest checkpoint, get it's backpointer and sends
>> it
>>>>> to supplier.
>>>>> 7.2) Supplier finds earliest checkpoint where [supplier(current) <=
>>>>> demander(backpointer)]
>>>>> 7.3) Supplier reads checkpoint backpointer and finds associated WAL
>>>>> record. This is where we start.
>>>>>
>>>>> So in terms of WAL we have: supplier[uc_backpointer <- cp(uc_current
<=
>>>>> demanter_uc_backpointer)] <- demander[uc_backpointer <- cp(last)]
>>>>>
>>>>> Now the most important - why it works :-)
>>>>> 1) Transaction opeartions are sequential, so at the time of crash nodes
>>>>> are *at most one operation ahead *each other
>>>>> 2) Demander goes to the past and finds update counter which was current
>>>> at
>>>>> the time of last TX completed operation
>>>>> 3) Supplier goes to the closest checkpoint in the past where this
>> update
>>>>> counter either doesn't exist or just appeared
>>>>> 4) Transaction cannot be committed on supplier at this checkpoint, as
>> it
>>>>> would violate UC happens-before rule
>>>>> 5) Tranasction may have not started yet on supplier at this point. If
>>>> more
>>>>> recent WAL records will contain *ALL* updates of the transaction
>>>>> 6) Transaction may exist on supplier at this checkpoint. Thanks to p.1
>> we
>>>>> must skip at most one operation. Jump back through supplier's
>> checkpoint
>>>>> backpointer is guaranteed to do this.
>>>>>
>>>>> Igor, do we have the same understanding here?
>>>>>
>>>>> Vladimir.
>>>>>
>>>>> On Thu, Nov 29, 2018 at 2:47 PM Seliverstov Igor <gvvinblade@gmail.com
>>>>> wrote:
>>>>>
>>>>>> Ivan,
>>>>>>
>>>>>> different transactions may be applied in different order on backup
>>>> nodes.
>>>>>> That's why we need an active tx set
>>>>>> and some sorting by their update times. The idea is to identify a
>> point
>>>> in
>>>>>> time which starting from we may lost some updates.
>>>>>> This point:
>>>>>>      1) is the last acknowledged by all backups (including possible
>>>> further
>>>>>> demander) update on timeline;
>>>>>>      2) have a specific update counter (aka back-counter) which we
>> going
>>>> to
>>>>>> start iteration from.
>>>>>>
>>>>>> After additional thinking on, I've identified a rule:
>>>>>>
>>>>>> There is two fences:
>>>>>>     1) update counter (UC) - this means that all updates, with less
UC
>>>> than
>>>>>> applied one, was applied on a node, having this UC.
>>>>>>     2) update in scope of TX - all updates are applied one by one
>>>>>> sequentially, this means that the fact of update guaranties the
>> previous
>>>>>> update (statement) was finished on all TX participants.
>>>>>>
>>>>>> Сombining them, we can say the next:
>>>>>>
>>>>>> All updates, that was acknowledged at the time the last update of
tx,
>>>>>> which
>>>>>> updated UC, applied, are guaranteed to be presented on a node having
>>>> such
>>>>>> UC
>>>>>>
>>>>>> We can use this rule to find an iterator start pointer.
>>>>>>
>>>>>> ср, 28 нояб. 2018 г. в 20:26, Павлухин Иван <vololo100@gmail.com>:
>>>>>>
>>>>>>> Guys,
>>>>>>>
>>>>>>> Another one idea. We can introduce additional update counter
which is
>>>>>>> incremented by MVCC transactions right after executing operation
>> (like
>>>>>>> is done for classic transactions). And we can use that counter
for
>>>>>>> searching needed WAL records. Can it did the trick?
>>>>>>>
>>>>>>> P.S. Mentally I am trying to separate facilities providing
>>>>>>> transactions and durability. And it seems to me that those facilities
>>>>>>> are in different dimensions.
>>>>>>> ср, 28 нояб. 2018 г. в 16:26, Павлухин Иван
<vololo100@gmail.com>:
>>>>>>>> Sorry, if it was stated that a SINGLE transaction updates
are
>>>> applied
>>>>>>>> in a same order on all replicas then I have no questions
so far. I
>>>>>>>> thought about reordering updates coming from different transactions.
>>>>>>>>> I have not got why we can assume that reordering is not
possible.
>>>>>> What
>>>>>>>> have I missed?
>>>>>>>> ср, 28 нояб. 2018 г. в 13:26, Павлухин Иван
<vololo100@gmail.com>:
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> Regarding Vladimir's new idea.
>>>>>>>>>> We assume that transaction can be represented as
a set of
>>>>>>> independent operations, which are applied in the same order on
both
>>>>>> primary
>>>>>>> and backup nodes.
>>>>>>>>> I have not got why we can assume that reordering is not
possible.
>>>>>> What
>>>>>>>>> have I missed?
>>>>>>>>> вт, 27 нояб. 2018 г. в 14:42, Seliverstov Igor
<
>>>>>> gvvinblade@gmail.com>:
>>>>>>>>>> Vladimir,
>>>>>>>>>>
>>>>>>>>>> I think I got your point,
>>>>>>>>>>
>>>>>>>>>> It should work if we do the next:
>>>>>>>>>> introduce two structures: active list (txs) and candidate
list
>>>>>>> (updCntr ->
>>>>>>>>>> txn pairs)
>>>>>>>>>>
>>>>>>>>>> Track active txs, mapping them to actual update counter
at
>>>> update
>>>>>>> time.
>>>>>>>>>> On each next update put update counter, associated
with previous
>>>>>>> update,
>>>>>>>>>> into a candidates list possibly overwrite existing
value
>>>> (checking
>>>>>>> txn)
>>>>>>>>>> On tx finish remove tx from active list only if appropriate
>>>> update
>>>>>>> counter
>>>>>>>>>> (associated with finished tx) is applied.
>>>>>>>>>> On update counter update set the minimal update counter
from the
>>>>>>> candidates
>>>>>>>>>> list as a back-counter, clear the candidate list
and remove an
>>>>>>> associated
>>>>>>>>>> tx from the active list if present.
>>>>>>>>>> Use back-counter instead of actual update counter
in demand
>>>>>> message.
>>>>>>>>>> вт, 27 нояб. 2018 г. в 12:56, Seliverstov
Igor <
>>>>>> gvvinblade@gmail.com
>>>>>>>> :
>>>>>>>>>>> Ivan,
>>>>>>>>>>>
>>>>>>>>>>> 1) The list is saved on each checkpoint, wholly
(all
>>>>>> transactions
>>>>>>> in
>>>>>>>>>>> active state at checkpoint begin).
>>>>>>>>>>> We need whole the list to get oldest transaction
because after
>>>>>>>>>>> the previous oldest tx finishes, we need to get
the following
>>>>>> one.
>>>>>>>>>>> 2) I guess there is a description of how persistent
storage
>>>>>> works
>>>>>>> and how
>>>>>>>>>>> it restores [1]
>>>>>>>>>>>
>>>>>>>>>>> Vladimir,
>>>>>>>>>>>
>>>>>>>>>>> the whole list of what we going to store on checkpoint
>>>>>> (updated):
>>>>>>>>>>> 1) Partition counter low watermark (LWM)
>>>>>>>>>>> 2) WAL pointer of earliest active transaction
write to
>>>> partition
>>>>>>> at the
>>>>>>>>>>> time the checkpoint have started
>>>>>>>>>>> 3) List of prepared txs with acquired partition
counters
>>>> (which
>>>>>>> were
>>>>>>>>>>> acquired but not applied yet)
>>>>>>>>>>>
>>>>>>>>>>> This way we don't need any additional info in
demand message.
>>>>>>> Start point
>>>>>>>>>>> can be easily determined using stored WAL "back-pointer".
>>>>>>>>>>>
>>>>>>>>>>> [1]
>>>>>>>>>>>
>> https://cwiki.apache.org/confluence/display/IGNITE/Ignite+Persistent+Store+-+under+the+hood#IgnitePersistentStore-underthehood-LocalRecoveryProcess
>>>>>>>>>>> вт, 27 нояб. 2018 г. в 11:19, Vladimir
Ozerov <
>>>>>>> vozerov@gridgain.com>:
>>>>>>>>>>>> Igor,
>>>>>>>>>>>>
>>>>>>>>>>>> Could you please elaborate - what is the
whole set of
>>>>>> information
>>>>>>> we are
>>>>>>>>>>>> going to save at checkpoint time? From what
I understand this
>>>>>>> should be:
>>>>>>>>>>>> 1) List of active transactions with WAL pointers
of their
>>>> first
>>>>>>> writes
>>>>>>>>>>>> 2) List of prepared transactions with their
update counters
>>>>>>>>>>>> 3) Partition counter low watermark (LWM)
- the smallest
>>>>>> partition
>>>>>>> counter
>>>>>>>>>>>> before which there are no prepared transactions.
>>>>>>>>>>>>
>>>>>>>>>>>> And the we send to supplier node a message:
"Give me all
>>>>>> updates
>>>>>>> starting
>>>>>>>>>>>> from that LWM plus data for that transactions
which were
>>>> active
>>>>>>> when I
>>>>>>>>>>>> failed".
>>>>>>>>>>>>
>>>>>>>>>>>> Am I right?
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Nov 23, 2018 at 11:22 AM Seliverstov
Igor <
>>>>>>> gvvinblade@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Igniters,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Currently I’m working on possible approaches
how to
>>>> implement
>>>>>>> historical
>>>>>>>>>>>>> rebalance (delta rebalance using WAL
iterator) over MVCC
>>>>>> caches.
>>>>>>>>>>>>> The main difficulty is that MVCC writes
changes on tx
>>>> active
>>>>>>> phase while
>>>>>>>>>>>>> partition update version, aka update
counter, is being
>>>>>> applied
>>>>>>> on tx
>>>>>>>>>>>>> finish. This means we cannot start iteration
over WAL right
>>>>>>> from the
>>>>>>>>>>>>> pointer where the update counter updated,
but should
>>>> include
>>>>>>> updates,
>>>>>>>>>>>> which
>>>>>>>>>>>>> the transaction that updated the counter
did.
>>>>>>>>>>>>>
>>>>>>>>>>>>> These updates may be much earlier than
the point where the
>>>>>>> update
>>>>>>>>>>>> counter
>>>>>>>>>>>>> was updated, so we have to be able to
identify the point
>>>>>> where
>>>>>>> the first
>>>>>>>>>>>>> update happened.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The proposed approach includes:
>>>>>>>>>>>>>
>>>>>>>>>>>>> 1) preserve list of active txs, sorted
by the time of their
>>>>>>> first update
>>>>>>>>>>>>> (using WAL ptr of first WAL record in
tx)
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2) persist this list on each checkpoint
(together with
>>>> TxLog
>>>>>> for
>>>>>>>>>>>> example)
>>>>>>>>>>>>> 4) send whole active tx list (transactions
which were in
>>>>>> active
>>>>>>> state at
>>>>>>>>>>>>> the time the node was crushed, empty
list in case of
>>>> graceful
>>>>>>> node
>>>>>>>>>>>> stop) as
>>>>>>>>>>>>> a part of partition demand message.
>>>>>>>>>>>>>
>>>>>>>>>>>>> 4) find a checkpoint where the earliest
tx exists in
>>>>>> persisted
>>>>>>> txs and
>>>>>>>>>>>> use
>>>>>>>>>>>>> saved WAL ptr as a start point or apply
current approach in
>>>>>>> case the
>>>>>>>>>>>> active
>>>>>>>>>>>>> tx list (sent on previous step) is empty
>>>>>>>>>>>>>
>>>>>>>>>>>>> 5) start iteration.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Your thoughts?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>> Igor
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Best regards,
>>>>>>>>> Ivan Pavlukhin
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best regards,
>>>>>>>> Ivan Pavlukhin
>>>>>>>
>>>>>>> --
>>>>>>> Best regards,
>>>>>>> Ivan Pavlukhin
>>>>>>>

Mime
View raw message