bookkeeper-distributedlog-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Khurrum Nasim <>
Subject Re: [Discuss] Transaction Support
Date Sun, 18 Sep 2016 02:25:53 GMT

The "stream level" transaction makes more sense to me. It is an extension
of 'atomic write' records over the size limitation. I can see the value of
having such ability.

But I am not convinced by the namespace level transaction. Do you have any
concrente use cases that you can talk about more?

- KN

On Mon, Sep 12, 2016 at 5:20 PM, Xi Liu <> wrote:

> Hello,
> I asked the transaction support in distributedlog user group two months
> ago. I want to raise this up again, as we are looking for using
> distributedlog for building a transactional data service. It is a major
> feature that is missing in distributedlog. We have some ideas to add this
> to distributedlog and want to know if they make sense or not. If they are
> good, we'd like to contribute and develop with the community.
> Here are the thoughts:
> -------------------------------------------------
> From our understanding, DL can provide "at-least-once" delivery semantic
> (if not, please correct me) but not "exactly-once" delivery semantic. That
> means that a message can be delivered one or more times if the reader
> doesn't handle duplicates.
> The duplicates come from two places, one is at writer side (this assumes
> using write proxy not the core library), while the other one is at reader
> side.
> - writer side: if the client attempts to write a record to the write
> proxies and gets a network error (e.g timeouts) then retries, the retrying
> will potentially result in duplicates.
> - reader side:if the reader reads a message from a stream and then crashes,
> when the reader restarts it would restart from last known position (DLSN).
> If the reader fails after processing a record and before recording the
> position, the processed record will be delivered again.
> The reader problem can be properly addressed by making use of the sequence
> numbers of records and doing proper checkpointing. For example, in
> database, it can checkpoint the indexed data with the sequence number of
> records; in flink, it can checkpoint the state with the sequence numbers.
> The writer problem can be addressed by implementing an idempotent writer.
> However, an alternative and more powerful approach is to support
> transactions.
> *What does transaction mean?*
> A transaction means a collection of records can be written transactionally
> within a stream or across multiple streams. They will be consumed by the
> reader together when a transaction is committed, or will never be consumed
> by the reader when the transaction is aborted.
> The transaction will expose following guarantees:
> - The reader should not be exposed to records written from uncommitted
> transactions (mandatory)
> - The reader should consume the records in the transaction commit order
> rather than the record written order (mandatory)
> - No duplicated records within a transaction (mandatory)
> - Allow interleaving transactional writes and non-transactional writes
> (optional)
> *Stream Transaction & Namespace Transaction*
> There will be two types of transaction, one is Stream level transaction
> (local transaction), while the other one is Namespace level transaction
> (global transaction).
> The stream level transaction is a transactional operation on writing
> records to one stream; the namespace level transaction is a transactional
> operation on writing records to multiple streams.
> *Implementation Thoughts*
> - A transaction is consist of begin control record, a series of data
> records and commit/abort control record.
> - The begin/commit/abort control record is written to a `commit` log
> stream, while the data records will be written to normal data log streams.
> - The `commit` log stream will be the same log stream for stream-level
> transaction,  while it will be a *system* stream (or multiple system
> streams) for namespace-level transactions.
> - The transaction code looks like as below:
> <code>
> Transaction txn = client.transaction();
> Future<DLSN> result1 = txn.write(stream-0, record);
> Future<DLSN> result2 = txn.write(stream-1, record);
> Future<DLSN> result3 = txn.write(stream-2, record);
> Future<Pair<DLSN, DLSN>> result = txn.commit();
> </code>
> if the txn is committed, all the write futures will be satisfied with their
> written DLSNs. if the txn is aborted, all the write futures will be failed
> together. there is no partial failure state.
> - The actually data flow will be:
> 1. writer get a transaction id from the owner of the `commit' log stream
> 1. write the begin control record (synchronously) with the transaction id
> 2. for each write within the same txn, it will be assigned a local sequence
> number starting from 0. the combination of transaction id and local
> sequence number will be used later on by the readers to de-duplicate
> records.
> 3. the commit/abort control record will be written based on the results
> from 2.
> - Application can supply a timeout for the transaction when #begin() a
> transaction. The owner of the `commit` log stream can abort transactions
> that never be committed/aborted within their timeout.
> - Failures:
> * all the log records can be simply retried as they will be de-duplicated
> probably at the reader side.
> - Reader:
> * Reader can be configured to read uncommitted records or committed records
> only (by default read uncommitted records)
> * If reader is configured to read committed records only, the read ahead
> cache will be changed to maintain one additional pending committed records.
> the pending committed records map is bounded and records will be dropped
> when read ahead is moving.
> * when the reader hits a commit record, it will rewind to the begin record
> and start reading from there. leveraging the proper read ahead cache and
> pending commit records cache, it would be good for both short transactions
> and long transactions.
> - DLSN, SequenceId:
> * We will add a fourth field to DLSN. It is `local sequence number` within
> a transaction session. So the new DLSN of records in a transaction will be
> the DLSN of commit control record plus its local sequence number.
> * The sequence id will be still the position of the commit record plus its
> local sequence number. The position will be advanced with total number of
> written records on writing the commit control record.
> - Transaction Group & Namespace Transaction
> using one single log stream for namespace transaction can cause the
> bottleneck problem since all the begin/commit/end control records will have
> to go through one log stream.
> the idea of 'transaction group' is to allow partitioning the writers into
> different transaction groups.
> clients can specify the `group-name` when starting the transaction. if
> there is no `group-name` specified, it will use the default `commit` log in
> the namespace for creating transactions.
> -------------------------------------------------
> I'd like to collect feedbacks on this idea. Appreciate any comments and if
> anyone is also interested in this idea, we'd like to collaborate with the
> community.
> - Xi

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message