distributedlog-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Xi Liu <xi.liu....@gmail.com>
Subject Re: Proxy Client - Batch Ordering / Commit
Date Thu, 08 Dec 2016 17:20:32 GMT
On Mon, Dec 5, 2016 at 9:05 AM, Leigh Stewart <lstewart@twitter.com.invalid>
wrote:

> Great discussion here :)
>
> Have you started any work for this? I just updated the proposal page -
> > https://cwiki.apache.org/confluence/display/DL/DP-2+-+
> Epoch+Write+Support
> > Maybe we can work together with this.
>
>
> Looks good Xi.
>
> If I understand the proposal correctly, this gets us, in thin client:
> a) ability to achieve a very strict form of consistency, supporting ex.
> exactly once updates
> b) exclusive ownership
>

Yes. this is correct. Also the other proposal DP-1 (transaction support)
would leverage this proposal here for achieving idempotent writes on thin
clients.


>
> I think there may also be a need for some kind of large atomic update
> functionality, which is a different way of looking at
> consistent/transactional updates.
>
> Cameron earlier Sijie asked if you need > 1MB writes - is that a
> requirement for you? If so, epoch write may not meet all of your
> requirements above.
>

If I understand correctly, with fencing operation, the client itself can
de-duplicate 'lost-ack' records by checking them after fencing.


>
> We actually have a fairly urgent business need to support something like
> this. Our two use cases are:
> 1. apply a set of logically separate writes as an atomic batch (we want
> them all to be visible at the same time, or not at all). might be useful to
> be able to aggregate updates and apply at a time of our choosing.
> 2. large writes: dl write now has a 1MB write limit. we will need to apply
> updates which are larger than 1MB atomically.
>

I am working on finalizing my proposal on DP-1 (transaction support). I
probably can send it out by end of this week.
We can collaborate on this.


>
> On Fri, Nov 18, 2016 at 2:37 PM, Sijie Guo <sijie@apache.org> wrote:
>
> > On Thu, Nov 17, 2016 at 2:30 AM, Xi Liu <xi.liu.ant@gmail.com> wrote:
> >
> > > Cameron,
> > >
> > > Thank you for your comments. It's very helpful. My replies are inline.
> > >
> > > On Wed, Nov 16, 2016 at 11:59 AM, Cameron Hatfield <kinguy@gmail.com>
> > > wrote:
> > >
> > > > "A couple of questions" is what I originally wrote, and then the
> > > following
> > > > happened. Sorry about the large swath of them, making sure my
> > > understanding
> > > > of the code base, as well as the DL/Bookkeeper/ZK ecosystem
> > interaction,
> > > > makes sense.
> > > >
> > > > ==General:
> > > > What is an exclusive session? What is it providing over a regular
> > > session?
> > > >
> > >
> > > The idea here is to provide exclusive writer semantic for the
> > > distributedlog (thin) client use case.
> > > So it can have similar behavior as using the storage
> > (distributedlog-core)
> > > library directly.
> > >
> >
> > +1 for an exclusive writer feature. Leigh and me were talking about this
> > before. It is glad to see it is happening now. However it might be worth
> to
> > separate the exclusive writer feature into a separate task once we have
> > 'fencing' feature available.
> >
> >
> > >
> > >
> > > >
> > > >
> > > > ==Proxy side:
> > > > Should a new streamop be added for the fencing operation, or does it
> > make
> > > > sense to piggyback on an existing one (such as write)?
> > >
> > >
> > > > ====getLastDLSN:
> > > > What should be the return result for:
> > > > A new stream
> > > > A new session, after successful fencing
> > > > A new session, after a change in ownership / first starting up
> > > >
> > > > What is the main use case for getLastDLSN(<stream>, false)? Is this
> to
> > > > guarantee that the recovery process has happened in case of ownership
> > > > failure (I don't have a good understanding of what causes the
> recovery
> > > > process to happen, especially from the reader side)? Or is it to
> handle
> > > the
> > > > lost-ack problem? Since all the rest of the read related things go
> > > through
> > > > the read client, I'm not sure if I see the use case, but it seems
> like
> > > > there would be a large potential for confusion on which to use. What
> > > about
> > > > just a fenceSession op, that always fences, returning the DLSN of the
> > > > fence, and leave the normal getLastDLSN for the regular read client.
> > > >
> > >
> > >
> > > Ah, your point is valid. I was following the style of bookkeeper. The
> > > bookkeeper client supplies a fencing flag on readLastAddConfirmed
> request
> > > during recovery.
> > >
> > > But you are right. It is clear to have just a fenceSessionOp and return
> > the
> > > DLSN of the fence request.
> > >
> >
> >
> > Correct. In bookkeeper client, the fencing flag is set with a read op.
> > However it is part of the recovery procedure and internal to the client.
> I
> > agree with Cameron that we should hide the details from public.
> Otherwise,
> > it will cause confusion.
> >
> >
> > >
> > >
> > >
> > > >
> > > > ====Fencing:
> > > > When a fence session occurs, what call needs to be made to make sure
> > any
> > > > outstanding writes are flushed and committed (so that we guarantee
> the
> > > > client will be able to read anything that was in the write queue)?
> > > > Is there a guaranteed ordering for things written in the future queue
> > for
> > > > AsyncLogWriter (I'm not quite confident that I was able to accurately
> > > > follow the logic, as their are many parts of the code that write,
> have
> > > > queues, heartbeat, etc)?
> > > >
> > >
> > > I believed that when calling AsyncLogWriter asyncWrite in order, the
> > > records will be written in order. Sijie or Leigh can confirm that.
> > >
> >
> > That's write. The writes are guaranteed to write in the order of how they
> > are issues.
> >
> >
> > >
> > > Since the writes are in order, when a fence session occurs, the control
> > > record written successfully by the writer will guarantee the writes
> > called
> > > before the fence session write are flushed and committed.
> > >
> > > We need to invalidate the session when we fence the session. So any
> > writes
> > > with old session come after writing the control record will be
> rejected.
> > In
> > > this way, it can guarantee the client will be able to read anything in
> a
> > > consistent way.
> > >
> > >
> > > >
> > > > ====SessionID:
> > > > What is the default sessionid / transactionid for a new stream? I
> > assume
> > > > this would just be the first control record
> > > >
> > >
> > > The initial session id will be the transaction id of the first control
> > > record.
> > >
> > >
> > > >
> > > > ======Should all streams have a sessionid by default, regardless if
> it
> > is
> > > > never used by a client (aka, everytime ownership changes, a new
> control
> > > > record is generated, and a sessionid is stored)?
> > > > Main edge case that would have to be handled is if a client writes
> with
> > > an
> > > > old sessionid, but the owner has changed and has yet to create a
> > > sessionid.
> > > > This should be handled by the "non-matching sessionid" rule, since
> the
> > > > invalid sessionid wouldn't match the passed sessionid, which should
> > cause
> > > > the client to get a new sessionid.
> > > >
> > >
> > > I think all streams should just have a session id by default. The
> session
> > > id is changed when ownership is changed or it is explicitly bumped by a
> > > fence session op.
> > >
> > >
> > > >
> > > > ======Where in the code does it make sense to own the session, the
> > stream
> > > > interfaces / classes? Should they pass that information down to the
> > ops,
> > > or
> > > > do the sessionid check within?
> > > > My first thought would be Stream owns the sessionid, passes it into
> the
> > > ops
> > > > (as either a nullable value, or an invalid default value), which then
> > do
> > > > the sessionid check if they care. The main issue is updating the
> > > sessionid
> > > > is a bit backwards, as either every op has the ability to update it
> > > through
> > > > some type of return value / direct stream access / etc, or there is a
> > > > special case in the stream for the fence operation / any other
> > operation
> > > > that can update the session.
> > > >
> > >
> > > My thought is to add session id in Stream (StreamImpl.java). The stream
> > > validates the session id before submitting a stream op. If it is a
> fence
> > > session op, it would just invalidate the current session, so the
> > subsequent
> > > requests with old session will be rejected.
> > >
> >
> > >
> > > >
> > > > ======For "the owner of the log stream will first advance the
> > transaction
> > > > id generator to claim a new transaction id and write a control record
> > to
> > > > the log stream. ":
> > > > Should "DistributedLogConstants.CONTROL_RECORD_CONTENT" be the type
> of
> > > > control record written?
> > > > Should the "writeControlRecord" on the BKAsyncLogWriter be exposed in
> > the
> > > > AsyncLogWriter interface be exposed?  Or even in the one within the
> > > segment
> > > > writer? Or should the code be duplicated / pulled out into a helper /
> > > etc?
> > > > (Not a big Java person, so any suggestions on the "Java Way", or at
> > least
> > > > the DL way, to do it would be appreciated)
> > > >
> > >
> > > I believe we can just construct a log record and set it to be a control
> > > record and write it.
> > >
> > > LogRecord record = ...
> > > record.setControl();
> > > writer.write(record);
> > >
> > > (Can anyone from community confirm that it is okay to write a control
> > > record in this way?)
> > >
> >
> > Ideally we would like to hide the logic from public usage. However, since
> > it is a code change at proxy side, it is absolutely fine.
> >
> >
> > >
> > >
> > >
> > > >
> > > > ======Transaction ID:
> > > > The BKLogSegmentWriter ignores the transaction ids from control
> records
> > > > when it records the "LastTXId." Would that be an issue here for
> > anything?
> > > > It looks like it may do that because it assumes you're calling it's
> > local
> > > > function for writing a controlrecord, which uses the lastTxId.
> > > >
> > >
> > > I think the lastTxId is for the last transaction id of user records. so
> > we
> > > probably don't change the behavior on how we record the lastTxId.
> However
> > > we can change how do we fetch the last tx id for id generation after
> > > recovery.
> > >
> >
> >
> > getLastTxId should have a flag to include control records or not.
> >
> >
> > >
> > >
> > > >
> > > >
> > > > ==Thrift Interface:
> > > > ====Should the write response be split out for different calls?
> > > > It seems odd to have a single struct with many optional items that
> are
> > > > filled depending on the call made for every rpc call. This is mostly
> a
> > > > curiosity question, since I assume it comes from the general
> practices
> > > from
> > > > using thrift for a while. Would it at least make sense for the
> > > > getLastDLSN/fence endpoint to have a new struct?
> > > >
> > >
> > > I don't have any preference here. It might make sense to have a new
> > struct.
> > >
> > >
> > > >
> > > > ====Any particular error code that makes sense for session fenced? If
> > we
> > > > want to be close to the HTTP errors, looks like 412 (PRECONDITION
> > FAILED)
> > > > might make the most sense, if a bit generic.
> > > >
> > > > 412 def:
> > > > "The precondition given in one or more of the request-header fields
> > > > evaluated to false when it was tested on the server. This response
> code
> > > > allows the client to place preconditions on the current resource
> > > > metainformation (header field data) and thus prevent the requested
> > method
> > > > from being applied to a resource other than the one intended."
> > > >
> > > > 412 excerpt from the If-match doc:
> > > > "This behavior is most useful when the client wants to prevent an
> > > updating
> > > > method, such as PUT, from modifying a resource that has changed since
> > the
> > > > client last retrieved it."
> > > >
> > >
> > > I liked your idea.
> > >
> > >
> > > >
> > > > ====Should we return the sessionid to the client in the
> "fencesession"
> > > > calls?
> > > > Seems like it may be useful when you fence, especially if you have
> some
> > > > type of custom sequencer where it would make sense for search, or for
> > > > debugging.
> > > > Main minus is that it would be easy for users to create an implicit
> > > > requirement that the sessionid is forever a valid transactionid,
> which
> > > may
> > > > not always be the case long term for the project.
> > > >
> > >
> > > I think we probably should start with not return and add if we really
> > need
> > > it.
> > >
> > >
> > > >
> > > >
> > > > ==Client:
> > > > ====What is the proposed process for the client retrieving the new
> > > > sessionid?
> > > > A full reconnect? No special case code, but intrusive on the client
> > side,
> > > > and possibly expensive garbage/processing wise. (though this type of
> > > > failure should hopefully be rare enough to not be a problem)
> > > > A call to reset the sessionid? Less intrusive, all the issues you get
> > > with
> > > > mutable object methods that need to be called in a certain order,
> edge
> > > > cases such as outstanding/buffered requests to the old stream, etc.
> > > > The call could also return the new sessionid, making it a good call
> for
> > > > storing or debugging the value.
> > > >
> > >
> > > It is probably not good to tight a session with a connection.
> Especially
> > > the underneath communication is a RPC framework.
> > >
> > > I think the new session id can be piggyback with rejected response. The
> > > client doesn't have to explicitly retrieve a new session id.
> > >
> >
> > +1 for separating session from connection. Especially the 'session' here
> is
> > more a lifecycle concept for a stream.
> >
> >
> >
> > >
> > >
> > > >
> > > > ====Session Fenced failure:
> > > > Will this put the client into a failure state, stopping all future
> > writes
> > > > until fixed?
> > > >
> > >
> > > My thought is the new session id will be piggyback with fence response.
> > so
> > > the client will know the new session id and all the future writes will
> > just
> > > carry the new session id.
> > >
> > >
> > > > Is it even possible to get this error when ownership changes? The
> > > > connection to the new owner should get a new sessionid on connect,
> so I
> > > > would expect not.
> > > >
> > >
> > > I think it will. But the client should handle and retry. As session
> > fenced
> > > exception is an exception that indicates that write is never attempted.
> > >
> > >
> > > >
> > > > Cheers,
> > > > Cameron
> > > >
> > > > On Tue, Nov 15, 2016 at 2:01 AM, Xi Liu <xi.liu.ant@gmail.com>
> wrote:
> > > >
> > > > > Thank you, Cameron. Look forward to your comments.
> > > > >
> > > > > - Xi
> > > > >
> > > > > On Sun, Nov 13, 2016 at 1:21 PM, Cameron Hatfield <
> kinguy@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Sorry, I've been on vacation for the past week, and heads down
> for
> > a
> > > > > > release that is using DL at the end of Nov. I'll take a look at
> > this
> > > > over
> > > > > > the next week, and add any relevant comments. After we are
> finished
> > > > with
> > > > > > dev for this release, I am hoping to tackle this next.
> > > > > >
> > > > > > -Cameron
> > > > > >
> > > > > > On Fri, Nov 11, 2016 at 12:07 PM, Sijie Guo <sijie@apache.org>
> > > wrote:
> > > > > >
> > > > > > > Xi,
> > > > > > >
> > > > > > > Thank you so much for your proposal. I took a look. It looks
> fine
> > > to
> > > > > me.
> > > > > > > Cameron, do you have any comments?
> > > > > > >
> > > > > > > Look forward to your pull requests.
> > > > > > >
> > > > > > > - Sijie
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Nov 9, 2016 at 2:34 AM, Xi Liu <xi.liu.ant@gmail.com>
> > > wrote:
> > > > > > >
> > > > > > > > Cameron,
> > > > > > > >
> > > > > > > > Have you started any work for this? I just updated the
> proposal
> > > > page
> > > > > -
> > > > > > > > https://cwiki.apache.org/confluence/display/DL/DP-2+-+
> > > > > > > Epoch+Write+Support
> > > > > > > > Maybe we can work together with this.
> > > > > > > >
> > > > > > > > Sijie, Leigh,
> > > > > > > >
> > > > > > > > can you guys help review this to make sure our proposal is in
> > the
> > > > > right
> > > > > > > > direction?
> > > > > > > >
> > > > > > > > - Xi
> > > > > > > >
> > > > > > > > On Tue, Nov 1, 2016 at 3:05 AM, Sijie Guo <sijie@apache.org>
> > > > wrote:
> > > > > > > >
> > > > > > > > > I created https://issues.apache.org/jira/browse/DL-63 for
> > > > tracking
> > > > > > the
> > > > > > > > > proposed idea here.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Wed, Oct 26, 2016 at 4:53 PM, Sijie Guo
> > > > > > <sijieg@twitter.com.invalid
> > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > On Tue, Oct 25, 2016 at 11:30 AM, Cameron Hatfield <
> > > > > > kinguy@gmail.com
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Yes, we are reading the HBase WAL (from their
> replication
> > > > > plugin
> > > > > > > > > > support),
> > > > > > > > > > > and writing that into DL.
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Gotcha.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > From the sounds of it, yes, it would. Only thing I
> would
> > > say
> > > > is
> > > > > > > make
> > > > > > > > > the
> > > > > > > > > > > epoch requirement optional, so that if I client doesn't
> > > care
> > > > > > about
> > > > > > > > > dupes
> > > > > > > > > > > they don't have to deal with the process of getting a
> new
> > > > > epoch.
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Yup. This should be optional. I can start a wiki page on
> > how
> > > we
> > > > > > want
> > > > > > > to
> > > > > > > > > > implement this. Are you interested in contributing to
> this?
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > -Cameron
> > > > > > > > > > >
> > > > > > > > > > > On Wed, Oct 19, 2016 at 7:43 PM, Sijie Guo
> > > > > > > > <sijieg@twitter.com.invalid
> > > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > On Wed, Oct 19, 2016 at 7:17 PM, Sijie Guo <
> > > > > sijieg@twitter.com
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Monday, October 17, 2016, Cameron Hatfield <
> > > > > > > kinguy@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > >> Answer inline:
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> On Mon, Oct 17, 2016 at 11:46 AM, Sijie Guo <
> > > > > > sijie@apache.org
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> > Cameron,
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > Thank you for your summary. I liked the
> discussion
> > > > > here. I
> > > > > > > > also
> > > > > > > > > > > liked
> > > > > > > > > > > > >> the
> > > > > > > > > > > > >> > summary of your requirement -
> > > 'single-writer-per-key,
> > > > > > > > > > > > >> > multiple-writers-per-log'. If I understand
> > > correctly,
> > > > > the
> > > > > > > core
> > > > > > > > > > > concern
> > > > > > > > > > > > >> here
> > > > > > > > > > > > >> > is almost 'exact-once' write (or a way to
> explicit
> > > > tell
> > > > > > if a
> > > > > > > > > write
> > > > > > > > > > > can
> > > > > > > > > > > > >> be
> > > > > > > > > > > > >> > retried or not).
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > Comments inline.
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > On Fri, Oct 14, 2016 at 11:17 AM, Cameron
> > Hatfield <
> > > > > > > > > > > kinguy@gmail.com>
> > > > > > > > > > > > >> > wrote:
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > > > Ah- yes good point (to be clear we're not
> > using
> > > > the
> > > > > > > proxy
> > > > > > > > > this
> > > > > > > > > > > way
> > > > > > > > > > > > >> > > today).
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> > > > > Due to the source of the
> > > > > > > > > > > > >> > > > > data (HBase Replication), we cannot
> > guarantee
> > > > > that a
> > > > > > > > > single
> > > > > > > > > > > > >> partition
> > > > > > > > > > > > >> > > will
> > > > > > > > > > > > >> > > > > be owned for writes by the same client.
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> > > > Do you mean you *need* to support multiple
> > > writers
> > > > > > > issuing
> > > > > > > > > > > > >> interleaved
> > > > > > > > > > > > >> > > > writes or is it just that they might
> sometimes
> > > > > > > interleave
> > > > > > > > > > writes
> > > > > > > > > > > > and
> > > > > > > > > > > > >> > you
> > > > > > > > > > > > >> > > >don't care?
> > > > > > > > > > > > >> > > How HBase partitions the keys being written
> > > wouldn't
> > > > > > have
> > > > > > > a
> > > > > > > > > > > one->one
> > > > > > > > > > > > >> > > mapping with the partitions we would have in
> > > HBase.
> > > > > Even
> > > > > > > if
> > > > > > > > we
> > > > > > > > > > did
> > > > > > > > > > > > >> have
> > > > > > > > > > > > >> > > that alignment when the cluster first started,
> > > HBase
> > > > > > will
> > > > > > > > > > > rebalance
> > > > > > > > > > > > >> what
> > > > > > > > > > > > >> > > servers own what partitions, as well as split
> > and
> > > > > merge
> > > > > > > > > > partitions
> > > > > > > > > > > > >> that
> > > > > > > > > > > > >> > > already exist, causing eventual drift from one
> > log
> > > > per
> > > > > > > > > > partition.
> > > > > > > > > > > > >> > > Because we want ordering guarantees per key
> (row
> > > in
> > > > > > > hbase),
> > > > > > > > we
> > > > > > > > > > > > >> partition
> > > > > > > > > > > > >> > > the logs by the key. Since multiple writers
> are
> > > > > possible
> > > > > > > per
> > > > > > > > > > range
> > > > > > > > > > > > of
> > > > > > > > > > > > >> > keys
> > > > > > > > > > > > >> > > (due to the aforementioned rebalancing /
> > > splitting /
> > > > > etc
> > > > > > > of
> > > > > > > > > > > hbase),
> > > > > > > > > > > > we
> > > > > > > > > > > > >> > > cannot use the core library due to requiring a
> > > > single
> > > > > > > writer
> > > > > > > > > for
> > > > > > > > > > > > >> > ordering.
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> > > But, for a single log, we don't really care
> > about
> > > > > > ordering
> > > > > > > > > aside
> > > > > > > > > > > > from
> > > > > > > > > > > > >> at
> > > > > > > > > > > > >> > > the per-key level. So all we really need to be
> > > able
> > > > to
> > > > > > > > handle
> > > > > > > > > is
> > > > > > > > > > > > >> > preventing
> > > > > > > > > > > > >> > > duplicates when a failure occurs, and ordering
> > > > > > consistency
> > > > > > > > > > across
> > > > > > > > > > > > >> > requests
> > > > > > > > > > > > >> > > from a single client.
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> > > So our general requirements are:
> > > > > > > > > > > > >> > > Write A, Write B
> > > > > > > > > > > > >> > > Timeline: A -> B
> > > > > > > > > > > > >> > > Request B is only made after A has
> successfully
> > > > > returned
> > > > > > > > > > (possibly
> > > > > > > > > > > > >> after
> > > > > > > > > > > > >> > > retries)
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> > > 1) If the write succeeds, it will be durably
> > > exposed
> > > > > to
> > > > > > > > > clients
> > > > > > > > > > > > within
> > > > > > > > > > > > >> > some
> > > > > > > > > > > > >> > > bounded time frame
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > Guaranteed.
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> > > 2) If A succeeds and B succeeds, the ordering
> > for
> > > > the
> > > > > > log
> > > > > > > > will
> > > > > > > > > > be
> > > > > > > > > > > A
> > > > > > > > > > > > >> and
> > > > > > > > > > > > >> > > then B
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > If I understand correctly here, B is only sent
> > > after A
> > > > > is
> > > > > > > > > > returned,
> > > > > > > > > > > > >> right?
> > > > > > > > > > > > >> > If that's the case, It is guaranteed.
> > > > > > > > > > > > >>
> > > > > > > > > > > > >>
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > > 3) If A fails due to an error that can be
> relied
> > > on
> > > > to
> > > > > > > *not*
> > > > > > > > > be
> > > > > > > > > > a
> > > > > > > > > > > > lost
> > > > > > > > > > > > >> > ack
> > > > > > > > > > > > >> > > problem, it will never be exposed to the
> client,
> > > so
> > > > it
> > > > > > may
> > > > > > > > > > > > (depending
> > > > > > > > > > > > >> on
> > > > > > > > > > > > >> > > the error) be retried immediately
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > If it is not a lost-ack problem, the entry will
> be
> > > > > > exposed.
> > > > > > > it
> > > > > > > > > is
> > > > > > > > > > > > >> > guaranteed.
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> Let me try rephrasing the questions, to make sure
> > I'm
> > > > > > > > > understanding
> > > > > > > > > > > > >> correctly:
> > > > > > > > > > > > >> If A fails, with an error such as "Unable to
> create
> > > > > > connection
> > > > > > > > to
> > > > > > > > > > > > >> bookkeeper server", that would be the type of
> error
> > we
> > > > > would
> > > > > > > > > expect
> > > > > > > > > > to
> > > > > > > > > > > > be
> > > > > > > > > > > > >> able to retry immediately, as that result means no
> > > > action
> > > > > > was
> > > > > > > > > taken
> > > > > > > > > > on
> > > > > > > > > > > > any
> > > > > > > > > > > > >> log / etc, so no entry could have been created.
> This
> > > is
> > > > > > > > different
> > > > > > > > > > > then a
> > > > > > > > > > > > >> "Connection Timeout" exception, as we just might
> not
> > > > have
> > > > > > > > gotten a
> > > > > > > > > > > > >> response
> > > > > > > > > > > > >> in time.
> > > > > > > > > > > > >>
> > > > > > > > > > > > >>
> > > > > > > > > > > > > Gotcha.
> > > > > > > > > > > > >
> > > > > > > > > > > > > The response code returned from proxy can tell if a
> > > > failure
> > > > > > can
> > > > > > > > be
> > > > > > > > > > > > retried
> > > > > > > > > > > > > safely or not. (We might need to make them well
> > > > documented)
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > > 4) If A fails due to an error that could be a
> > lost
> > > > ack
> > > > > > > > problem
> > > > > > > > > > > > >> (network
> > > > > > > > > > > > >> > > connectivity / etc), within a bounded time
> frame
> > > it
> > > > > > should
> > > > > > > > be
> > > > > > > > > > > > >> possible to
> > > > > > > > > > > > >> > > find out if the write succeed or failed.
> Either
> > by
> > > > > > reading
> > > > > > > > > from
> > > > > > > > > > > some
> > > > > > > > > > > > >> > > checkpoint of the log for the changes that
> > should
> > > > have
> > > > > > > been
> > > > > > > > > made
> > > > > > > > > > > or
> > > > > > > > > > > > >> some
> > > > > > > > > > > > >> > > other possible server-side support.
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > If I understand this correctly, it is a
> > duplication
> > > > > issue,
> > > > > > > > > right?
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > Can a de-duplication solution work here? Either
> DL
> > > or
> > > > > your
> > > > > > > > > client
> > > > > > > > > > > does
> > > > > > > > > > > > >> the
> > > > > > > > > > > > >> > de-duplication?
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> The requirements I'm mentioning are the ones
> needed
> > > for
> > > > > > > > > client-side
> > > > > > > > > > > > >> dedupping. Since if I can guarantee writes being
> > > exposed
> > > > > > > within
> > > > > > > > > some
> > > > > > > > > > > > time
> > > > > > > > > > > > >> frame, and I can never get into an inconsistently
> > > > ordered
> > > > > > > state
> > > > > > > > > when
> > > > > > > > > > > > >> successes happen, when an error occurs, I can
> always
> > > > wait
> > > > > > for
> > > > > > > > max
> > > > > > > > > > time
> > > > > > > > > > > > >> frame, read the latest writes, and then dedup
> > locally
> > > > > > against
> > > > > > > > the
> > > > > > > > > > > > request
> > > > > > > > > > > > >> I
> > > > > > > > > > > > >> just made.
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> The main thing about that timeframe is that its
> > > > basically
> > > > > > the
> > > > > > > > > > addition
> > > > > > > > > > > > of
> > > > > > > > > > > > >> every timeout, all the way down in the system,
> > > combined
> > > > > with
> > > > > > > > > > whatever
> > > > > > > > > > > > >> flushing / caching / etc times are at the
> > bookkeeper /
> > > > > > client
> > > > > > > > > level
> > > > > > > > > > > for
> > > > > > > > > > > > >> when values are exposed
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > Gotcha.
> > > > > > > > > > > > >
> > > > > > > > > > > > >>
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > Is there any ways to identify your write?
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > I can think of a case as follow - I want to know
> > > what
> > > > is
> > > > > > > your
> > > > > > > > > > > expected
> > > > > > > > > > > > >> > behavior from the log.
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > a)
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > If a hbase region server A writes a change of
> key
> > K
> > > to
> > > > > the
> > > > > > > > log,
> > > > > > > > > > the
> > > > > > > > > > > > >> change
> > > > > > > > > > > > >> > is successfully made to the log;
> > > > > > > > > > > > >> > but server A is down before receiving the
> change.
> > > > > > > > > > > > >> > region server B took over the region that
> contains
> > > K,
> > > > > what
> > > > > > > > will
> > > > > > > > > B
> > > > > > > > > > > do?
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> HBase writes in large chunks (WAL Logs), which its
> > > > > > replication
> > > > > > > > > > system
> > > > > > > > > > > > then
> > > > > > > > > > > > >> handles by replaying in the case of failure. If
> I'm
> > > in a
> > > > > > > middle
> > > > > > > > > of a
> > > > > > > > > > > > log,
> > > > > > > > > > > > >> and the whole region goes down and gets
> rescheduled
> > > > > > > elsewhere, I
> > > > > > > > > > will
> > > > > > > > > > > > >> start
> > > > > > > > > > > > >> back up from the beginning of the log I was in the
> > > > middle
> > > > > > of.
> > > > > > > > > Using
> > > > > > > > > > > > >> checkpointing + deduping, we should be able to
> find
> > > out
> > > > > > where
> > > > > > > we
> > > > > > > > > > left
> > > > > > > > > > > > off
> > > > > > > > > > > > >> in the log.
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > b) same as a). but server A was just network
> > > > > partitioned.
> > > > > > > will
> > > > > > > > > > both
> > > > > > > > > > > A
> > > > > > > > > > > > >> and B
> > > > > > > > > > > > >> > write the change of key K?
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> HBase gives us some guarantees around network
> > > partitions
> > > > > > > > > > (Consistency
> > > > > > > > > > > > over
> > > > > > > > > > > > >> availability for HBase). HBase is a single-master
> > > > failover
> > > > > > > > > recovery
> > > > > > > > > > > type
> > > > > > > > > > > > >> of
> > > > > > > > > > > > >> system, with zookeeper-based guarantees for single
> > > > owners
> > > > > > > > > (writers)
> > > > > > > > > > > of a
> > > > > > > > > > > > >> range of data.
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > > 5) If A is turned into multiple batches (one
> > large
> > > > > > request
> > > > > > > > > gets
> > > > > > > > > > > > split
> > > > > > > > > > > > >> > into
> > > > > > > > > > > > >> > > multiple smaller ones to the bookkeeper
> backend,
> > > due
> > > > > to
> > > > > > > log
> > > > > > > > > > > rolling
> > > > > > > > > > > > /
> > > > > > > > > > > > >> > size
> > > > > > > > > > > > >> > > / etc):
> > > > > > > > > > > > >> > >   a) The ordering of entries within batches
> have
> > > > > > ordering
> > > > > > > > > > > > consistence
> > > > > > > > > > > > >> > with
> > > > > > > > > > > > >> > > the original request, when exposed in the log
> > > > (though
> > > > > > they
> > > > > > > > may
> > > > > > > > > > be
> > > > > > > > > > > > >> > > interleaved with other requests)
> > > > > > > > > > > > >> > >   b) The ordering across batches have ordering
> > > > > > consistence
> > > > > > > > > with
> > > > > > > > > > > the
> > > > > > > > > > > > >> > > original request, when exposed in the log
> > (though
> > > > they
> > > > > > may
> > > > > > > > be
> > > > > > > > > > > > >> interleaved
> > > > > > > > > > > > >> > > with other requests)
> > > > > > > > > > > > >> > >   c) If a batch fails, and cannot be retried /
> > is
> > > > > > > > > unsuccessfully
> > > > > > > > > > > > >> retried,
> > > > > > > > > > > > >> > > all batches after the failed batch should not
> be
> > > > > exposed
> > > > > > > in
> > > > > > > > > the
> > > > > > > > > > > log.
> > > > > > > > > > > > >> > Note:
> > > > > > > > > > > > >> > > The batches before and including the failed
> > batch,
> > > > > that
> > > > > > > > ended
> > > > > > > > > up
> > > > > > > > > > > > >> > > succeeding, can show up in the log, again
> within
> > > > some
> > > > > > > > bounded
> > > > > > > > > > time
> > > > > > > > > > > > >> range
> > > > > > > > > > > > >> > > for reads by a client.
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > There is a method 'writeBulk' in
> > > DistributedLogClient
> > > > > can
> > > > > > > > > achieve
> > > > > > > > > > > this
> > > > > > > > > > > > >> > guarantee.
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > However, I am not very sure about how will you
> > turn
> > > A
> > > > > into
> > > > > > > > > > batches.
> > > > > > > > > > > If
> > > > > > > > > > > > >> you
> > > > > > > > > > > > >> > are dividing A into batches,
> > > > > > > > > > > > >> > you can simply control the application write
> > > sequence
> > > > to
> > > > > > > > achieve
> > > > > > > > > > the
> > > > > > > > > > > > >> > guarantee here.
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > Can you explain more about this?
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> In this case, by batches I mean what the proxy
> does
> > > with
> > > > > the
> > > > > > > > > single
> > > > > > > > > > > > >> request
> > > > > > > > > > > > >> that I send it. If the proxy decides it needs to
> > turn
> > > my
> > > > > > > single
> > > > > > > > > > > request
> > > > > > > > > > > > >> into multiple batches of requests, due to log
> > rolling,
> > > > > size
> > > > > > > > > > > limitations,
> > > > > > > > > > > > >> etc, those would be the guarantees I need to be
> able
> > > to
> > > > > > > > > reduplicate
> > > > > > > > > > on
> > > > > > > > > > > > the
> > > > > > > > > > > > >> client side.
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > A single record written by #write and A record set
> > (set
> > > > of
> > > > > > > > records)
> > > > > > > > > > > > > written by #writeRecordSet are atomic - they will
> not
> > > be
> > > > > > broken
> > > > > > > > > down
> > > > > > > > > > > into
> > > > > > > > > > > > > entries (batches). With the correct response code,
> > you
> > > > > would
> > > > > > be
> > > > > > > > > able
> > > > > > > > > > to
> > > > > > > > > > > > > tell if it is a lost-ack failure or not. However
> > there
> > > > is a
> > > > > > > size
> > > > > > > > > > > > limitation
> > > > > > > > > > > > > for this - it can't not go beyond 1MB for current
> > > > > > > implementation.
> > > > > > > > > > > > >
> > > > > > > > > > > > > What is your expected record size?
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> > > Since we can guarantee per-key ordering on the
> > > > client
> > > > > > > side,
> > > > > > > > we
> > > > > > > > > > > > >> guarantee
> > > > > > > > > > > > >> > > that there is a single writer per-key, just
> not
> > > per
> > > > > log.
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > Do you need fencing guarantee in the case of
> > network
> > > > > > > partition
> > > > > > > > > > > causing
> > > > > > > > > > > > >> > two-writers?
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > > So if there was a
> > > > > > > > > > > > >> > > way to guarantee a single write request as
> being
> > > > > written
> > > > > > > or
> > > > > > > > > not,
> > > > > > > > > > > > >> within a
> > > > > > > > > > > > >> > > certain time frame (since failures should be
> > rare
> > > > > > anyways,
> > > > > > > > > this
> > > > > > > > > > is
> > > > > > > > > > > > >> fine
> > > > > > > > > > > > >> > if
> > > > > > > > > > > > >> > > it is expensive), we can then have the client
> > > > > guarantee
> > > > > > > the
> > > > > > > > > > > ordering
> > > > > > > > > > > > >> it
> > > > > > > > > > > > >> > > needs.
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > This sounds an 'exact-once' write (regarding
> > > retries)
> > > > > > > > > requirement
> > > > > > > > > > to
> > > > > > > > > > > > me,
> > > > > > > > > > > > >> > right?
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> Yes. I'm curious of how this issue is handled by
> > > > > Manhattan,
> > > > > > > > since
> > > > > > > > > > you
> > > > > > > > > > > > can
> > > > > > > > > > > > >> imagine a data store that ends up getting multiple
> > > > writes
> > > > > > for
> > > > > > > > the
> > > > > > > > > > same
> > > > > > > > > > > > put
> > > > > > > > > > > > >> / get / etc, would be harder to use, and we are
> > > > basically
> > > > > > > trying
> > > > > > > > > to
> > > > > > > > > > > > create
> > > > > > > > > > > > >> a log like that for HBase.
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > Are you guys replacing HBase WAL?
> > > > > > > > > > > > >
> > > > > > > > > > > > > In Manhattan case, the request will be first
> written
> > to
> > > > DL
> > > > > > > > streams
> > > > > > > > > by
> > > > > > > > > > > > > Manhattan coordinator. The Manhattan replica then
> > will
> > > > read
> > > > > > > from
> > > > > > > > > the
> > > > > > > > > > DL
> > > > > > > > > > > > > streams and apply the change. In the lost-ack case,
> > the
> > > > MH
> > > > > > > > > > coordinator
> > > > > > > > > > > > will
> > > > > > > > > > > > > just fail the request to client.
> > > > > > > > > > > > >
> > > > > > > > > > > > > My feeling here is your usage for HBase is a bit
> > > > different
> > > > > > from
> > > > > > > > how
> > > > > > > > > > we
> > > > > > > > > > > > use
> > > > > > > > > > > > > DL in Manhattan. It sounds like you read from a
> > source
> > > > > (HBase
> > > > > > > > WAL)
> > > > > > > > > > and
> > > > > > > > > > > > > write to DL. But I might be wrong.
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> > > > Cameron:
> > > > > > > > > > > > >> > > > Another thing we've discussed but haven't
> > really
> > > > > > thought
> > > > > > > > > > > through -
> > > > > > > > > > > > >> > > > We might be able to support some kind of
> epoch
> > > > write
> > > > > > > > > request,
> > > > > > > > > > > > where
> > > > > > > > > > > > >> the
> > > > > > > > > > > > >> > > > epoch is guaranteed to have changed if the
> > > writer
> > > > > has
> > > > > > > > > changed
> > > > > > > > > > or
> > > > > > > > > > > > the
> > > > > > > > > > > > >> > > ledger
> > > > > > > > > > > > >> > > > was ever fenced off. Writes include an epoch
> > and
> > > > are
> > > > > > > > > rejected
> > > > > > > > > > if
> > > > > > > > > > > > the
> > > > > > > > > > > > >> > > epoch
> > > > > > > > > > > > >> > > > has changed.
> > > > > > > > > > > > >> > > > With a mechanism like this, fencing the
> ledger
> > > off
> > > > > > > after a
> > > > > > > > > > > failure
> > > > > > > > > > > > >> > would
> > > > > > > > > > > > >> > > > ensure any pending writes had either been
> > > written
> > > > or
> > > > > > > would
> > > > > > > > > be
> > > > > > > > > > > > >> rejected.
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> > > The issue would be how I guarantee the write I
> > > wrote
> > > > > to
> > > > > > > the
> > > > > > > > > > server
> > > > > > > > > > > > was
> > > > > > > > > > > > >> > > written. Since a network issue could happen on
> > the
> > > > > send
> > > > > > of
> > > > > > > > the
> > > > > > > > > > > > >> request,
> > > > > > > > > > > > >> > or
> > > > > > > > > > > > >> > > on the receive of the success response, an
> epoch
> > > > > > wouldn't
> > > > > > > > tell
> > > > > > > > > > me
> > > > > > > > > > > > if I
> > > > > > > > > > > > >> > can
> > > > > > > > > > > > >> > > successfully retry, as it could be
> successfully
> > > > > written
> > > > > > > but
> > > > > > > > > AWS
> > > > > > > > > > > > >> dropped
> > > > > > > > > > > > >> > the
> > > > > > > > > > > > >> > > connection for the success response. Since the
> > > epoch
> > > > > > would
> > > > > > > > be
> > > > > > > > > > the
> > > > > > > > > > > > same
> > > > > > > > > > > > >> > > (same ledger), I could write duplicates.
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> > > > We are currently proposing adding a
> > transaction
> > > > > > semantic
> > > > > > > > to
> > > > > > > > > dl
> > > > > > > > > > > to
> > > > > > > > > > > > >> get
> > > > > > > > > > > > >> > rid
> > > > > > > > > > > > >> > > > of the size limitation and the unaware-ness
> in
> > > the
> > > > > > proxy
> > > > > > > > > > client.
> > > > > > > > > > > > >> Here
> > > > > > > > > > > > >> > is
> > > > > > > > > > > > >> > > > our idea -
> > > > > > > > > > > > >> > > > http://mail-archives.apache.
> > > > org/mod_mbox/incubator-
> > > > > > > > > > > distributedlog
> > > > > > > > > > > > >> > > -dev/201609.mbox/%
> 3cCAAC6BxP5YyEHwG0ZCF5soh42X=
> > > > xuYwYm
> > > > > > > > > > > > >> > > <http://mail-archives.apache.
> > > > org/mod_mbox/incubator-
> > > > > > > > > > > > >> > distributedlog%0A-dev/201609.mbox/%
> > > > > > > 3cCAAC6BxP5YyEHwG0ZCF5soh
> > > > > > > > > > > > 42X=xuYwYm>
> > > > > > > > > > > > >> > > L4nXsYBYiofzxpVk6g@mail.gmail.com%3e
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> > > > I am not sure if your idea is similar as
> ours.
> > > but
> > > > > > we'd
> > > > > > > > like
> > > > > > > > > > to
> > > > > > > > > > > > >> > > collaborate
> > > > > > > > > > > > >> > > > with the community if anyone has the similar
> > > idea.
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> > > Our use case would be covered by transaction
> > > > support,
> > > > > > but
> > > > > > > > I'm
> > > > > > > > > > > unsure
> > > > > > > > > > > > >> if
> > > > > > > > > > > > >> > we
> > > > > > > > > > > > >> > > would need something that heavy weight for the
> > > > > > guarantees
> > > > > > > we
> > > > > > > > > > need.
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> > > Basically, the high level requirement here is
> > > > "Support
> > > > > > > > > > consistent
> > > > > > > > > > > > >> write
> > > > > > > > > > > > >> > > ordering for single-writer-per-key,
> > > > > > multi-writer-per-log".
> > > > > > > > My
> > > > > > > > > > > hunch
> > > > > > > > > > > > is
> > > > > > > > > > > > >> > > that, with some added guarantees to the proxy
> > (if
> > > it
> > > > > > isn't
> > > > > > > > > > already
> > > > > > > > > > > > >> > > supported), and some custom client code on our
> > > side
> > > > > for
> > > > > > > > > removing
> > > > > > > > > > > the
> > > > > > > > > > > > >> > > entries that actually succeed to write to
> > > > > DistributedLog
> > > > > > > > from
> > > > > > > > > > the
> > > > > > > > > > > > >> request
> > > > > > > > > > > > >> > > that failed, it should be a relatively easy
> > thing
> > > to
> > > > > > > > support.
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > Yup. I think it should not be very difficult to
> > > > support.
> > > > > > > There
> > > > > > > > > > might
> > > > > > > > > > > > be
> > > > > > > > > > > > >> > some changes in the server side.
> > > > > > > > > > > > >> > Let's figure out what will the changes be. Are
> you
> > > > guys
> > > > > > > > > interested
> > > > > > > > > > > in
> > > > > > > > > > > > >> > contributing?
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > Yes, we would be.
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> As a note, the one thing that we see as an issue
> > with
> > > > the
> > > > > > > client
> > > > > > > > > > side
> > > > > > > > > > > > >> dedupping is how to bound the range of data that
> > needs
> > > > to
> > > > > be
> > > > > > > > > looked
> > > > > > > > > > at
> > > > > > > > > > > > for
> > > > > > > > > > > > >> deduplication. As you can imagine, it is pretty
> easy
> > > to
> > > > > > bound
> > > > > > > > the
> > > > > > > > > > > bottom
> > > > > > > > > > > > >> of
> > > > > > > > > > > > >> the range, as that it just regular checkpointing
> of
> > > the
> > > > > DSLN
> > > > > > > > that
> > > > > > > > > is
> > > > > > > > > > > > >> returned. I'm still not sure if there is any nice
> > way
> > > to
> > > > > > time
> > > > > > > > > bound
> > > > > > > > > > > the
> > > > > > > > > > > > >> top
> > > > > > > > > > > > >> end of the range, especially since the proxy owns
> > > > sequence
> > > > > > > > numbers
> > > > > > > > > > > > (which
> > > > > > > > > > > > >> makes sense). I am curious if there is more that
> can
> > > be
> > > > > done
> > > > > > > if
> > > > > > > > > > > > >> deduplication is on the server side. However the
> > main
> > > > > minus
> > > > > > I
> > > > > > > > see
> > > > > > > > > of
> > > > > > > > > > > > >> server
> > > > > > > > > > > > >> side deduplication is that instead of running
> > > contingent
> > > > > on
> > > > > > > > there
> > > > > > > > > > > being
> > > > > > > > > > > > a
> > > > > > > > > > > > >> failed client request, instead it would have to
> run
> > > > every
> > > > > > > time a
> > > > > > > > > > write
> > > > > > > > > > > > >> happens.
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > For a reliable dedup, we probably need
> > > > > fence-then-getLastDLSN
> > > > > > > > > > > operation -
> > > > > > > > > > > > > so it would guarantee that any non-completed
> requests
> > > > > issued
> > > > > > > > > > (lost-ack
> > > > > > > > > > > > > requests) before this fence-then-getLastDLSN
> > operation
> > > > will
> > > > > > be
> > > > > > > > > failed
> > > > > > > > > > > and
> > > > > > > > > > > > > they will never land at the log.
> > > > > > > > > > > > >
> > > > > > > > > > > > > the pseudo code would look like below -
> > > > > > > > > > > > >
> > > > > > > > > > > > > write(request) onFailure { t =>
> > > > > > > > > > > > >
> > > > > > > > > > > > > if (t is timeout exception) {
> > > > > > > > > > > > >
> > > > > > > > > > > > > DLSN lastDLSN = fenceThenGetLastDLSN()
> > > > > > > > > > > > > DLSN lastCheckpointedDLSN = ...;
> > > > > > > > > > > > > // find if the request lands between [lastDLSN,
> > > > > > > > > > lastCheckpointedDLSN].
> > > > > > > > > > > > > // if it exists, the write succeed; otherwise
> retry.
> > > > > > > > > > > > >
> > > > > > > > > > > > > }
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > }
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > Just realized the idea is same as what Leigh raised
> in
> > > the
> > > > > > > previous
> > > > > > > > > > email
> > > > > > > > > > > > about 'epoch write'. Let me explain more about this
> > idea
> > > > > > (Leigh,
> > > > > > > > feel
> > > > > > > > > > > free
> > > > > > > > > > > > to jump in to fill up your idea).
> > > > > > > > > > > >
> > > > > > > > > > > > - when a log stream is owned,  the proxy use the last
> > > > > > transaction
> > > > > > > > id
> > > > > > > > > as
> > > > > > > > > > > the
> > > > > > > > > > > > epoch
> > > > > > > > > > > > - when a client connects (handshake with the proxy),
> it
> > > > will
> > > > > > get
> > > > > > > > the
> > > > > > > > > > > epoch
> > > > > > > > > > > > for the stream.
> > > > > > > > > > > > - the writes issued by this client will carry the
> epoch
> > > to
> > > > > the
> > > > > > > > proxy.
> > > > > > > > > > > > - add a new rpc - fenceThenGetLastDLSN - it would
> force
> > > the
> > > > > > proxy
> > > > > > > > to
> > > > > > > > > > bump
> > > > > > > > > > > > the epoch.
> > > > > > > > > > > > - if fenceThenGetLastDLSN happened, all the
> outstanding
> > > > > writes
> > > > > > > with
> > > > > > > > > old
> > > > > > > > > > > > epoch will be rejected with exceptions (e.g.
> > > EpochFenced).
> > > > > > > > > > > > - The DLSN returned from fenceThenGetLastDLSN can be
> > used
> > > > as
> > > > > > the
> > > > > > > > > bound
> > > > > > > > > > > for
> > > > > > > > > > > > deduplications on failures.
> > > > > > > > > > > >
> > > > > > > > > > > > Cameron, does this sound a solution to your use case?
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> Maybe something that could fit a similar need that
> > > Kafka
> > > > > > does
> > > > > > > > (the
> > > > > > > > > > > last
> > > > > > > > > > > > >> store value for a particular key in a log), such
> > that
> > > > on a
> > > > > > per
> > > > > > > > key
> > > > > > > > > > > basis
> > > > > > > > > > > > >> there could be a sequence number that support
> > > > > deduplication?
> > > > > > > > Cost
> > > > > > > > > > > seems
> > > > > > > > > > > > >> like it would be high however, and I'm not even
> sure
> > > if
> > > > > > > > bookkeeper
> > > > > > > > > > > > >> supports
> > > > > > > > > > > > >> it.
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >> Cheers,
> > > > > > > > > > > > >> Cameron
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> > > Thanks,
> > > > > > > > > > > > >> > > Cameron
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> > > On Sat, Oct 8, 2016 at 7:35 AM, Leigh Stewart
> > > > > > > > > > > > >> > <lstewart@twitter.com.invalid
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > > wrote:
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> > > > Cameron:
> > > > > > > > > > > > >> > > > Another thing we've discussed but haven't
> > really
> > > > > > thought
> > > > > > > > > > > through -
> > > > > > > > > > > > >> > > > We might be able to support some kind of
> epoch
> > > > write
> > > > > > > > > request,
> > > > > > > > > > > > where
> > > > > > > > > > > > >> the
> > > > > > > > > > > > >> > > > epoch is guaranteed to have changed if the
> > > writer
> > > > > has
> > > > > > > > > changed
> > > > > > > > > > or
> > > > > > > > > > > > the
> > > > > > > > > > > > >> > > ledger
> > > > > > > > > > > > >> > > > was ever fenced off. Writes include an epoch
> > and
> > > > are
> > > > > > > > > rejected
> > > > > > > > > > if
> > > > > > > > > > > > the
> > > > > > > > > > > > >> > > epoch
> > > > > > > > > > > > >> > > > has changed.
> > > > > > > > > > > > >> > > > With a mechanism like this, fencing the
> ledger
> > > off
> > > > > > > after a
> > > > > > > > > > > failure
> > > > > > > > > > > > >> > would
> > > > > > > > > > > > >> > > > ensure any pending writes had either been
> > > written
> > > > or
> > > > > > > would
> > > > > > > > > be
> > > > > > > > > > > > >> rejected.
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > > > On Sat, Oct 8, 2016 at 7:10 AM, Sijie Guo <
> > > > > > > > sijie@apache.org
> > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > > > > Cameron,
> > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > >> > > > > I think both Leigh and Xi had made a few
> > good
> > > > > points
> > > > > > > > about
> > > > > > > > > > > your
> > > > > > > > > > > > >> > > question.
> > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > >> > > > > To add one more point to your question -
> > "but
> > > I
> > > > am
> > > > > > not
> > > > > > > > > > > > >> > > > > 100% of how all of the futures in the code
> > > > handle
> > > > > > > > > failures.
> > > > > > > > > > > > >> > > > > If not, where in the code would be the
> > > relevant
> > > > > > places
> > > > > > > > to
> > > > > > > > > > add
> > > > > > > > > > > > the
> > > > > > > > > > > > >> > > ability
> > > > > > > > > > > > >> > > > > to do this, and would the project be
> > > interested
> > > > > in a
> > > > > > > > pull
> > > > > > > > > > > > >> request?"
> > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > >> > > > > The current proxy and client logic doesn't
> > do
> > > > > > > perfectly
> > > > > > > > on
> > > > > > > > > > > > >> handling
> > > > > > > > > > > > >> > > > > failures (duplicates) - the strategy now
> is
> > > the
> > > > > > client
> > > > > > > > > will
> > > > > > > > > > > > retry
> > > > > > > > > > > > >> as
> > > > > > > > > > > > >> > > best
> > > > > > > > > > > > >> > > > > at it can before throwing exceptions to
> > users.
> > > > The
> > > > > > > code
> > > > > > > > > you
> > > > > > > > > > > are
> > > > > > > > > > > > >> > looking
> > > > > > > > > > > > >> > > > for
> > > > > > > > > > > > >> > > > > - it is on BKLogSegmentWriter for the
> proxy
> > > > > handling
> > > > > > > > > writes
> > > > > > > > > > > and
> > > > > > > > > > > > >> it is
> > > > > > > > > > > > >> > > on
> > > > > > > > > > > > >> > > > > DistributedLogClientImpl for the proxy
> > client
> > > > > > handling
> > > > > > > > > > > responses
> > > > > > > > > > > > >> from
> > > > > > > > > > > > >> > > > > proxies. Does this help you?
> > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > >> > > > > And also, you are welcome to contribute
> the
> > > pull
> > > > > > > > requests.
> > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > >> > > > > - Sijie
> > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > >> > > > > On Tue, Oct 4, 2016 at 3:39 PM, Cameron
> > > > Hatfield <
> > > > > > > > > > > > >> kinguy@gmail.com>
> > > > > > > > > > > > >> > > > wrote:
> > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > >> > > > > > I have a question about the Proxy
> Client.
> > > > > > Basically,
> > > > > > > > for
> > > > > > > > > > our
> > > > > > > > > > > > use
> > > > > > > > > > > > >> > > cases,
> > > > > > > > > > > > >> > > > > we
> > > > > > > > > > > > >> > > > > > want to guarantee ordering at the key
> > level,
> > > > > > > > > irrespective
> > > > > > > > > > of
> > > > > > > > > > > > the
> > > > > > > > > > > > >> > > > ordering
> > > > > > > > > > > > >> > > > > > of the partition it may be assigned to
> as
> > a
> > > > > whole.
> > > > > > > Due
> > > > > > > > > to
> > > > > > > > > > > the
> > > > > > > > > > > > >> > source
> > > > > > > > > > > > >> > > of
> > > > > > > > > > > > >> > > > > the
> > > > > > > > > > > > >> > > > > > data (HBase Replication), we cannot
> > > guarantee
> > > > > > that a
> > > > > > > > > > single
> > > > > > > > > > > > >> > partition
> > > > > > > > > > > > >> > > > > will
> > > > > > > > > > > > >> > > > > > be owned for writes by the same client.
> > This
> > > > > means
> > > > > > > the
> > > > > > > > > > proxy
> > > > > > > > > > > > >> client
> > > > > > > > > > > > >> > > > works
> > > > > > > > > > > > >> > > > > > well (since we don't care which proxy
> owns
> > > the
> > > > > > > > partition
> > > > > > > > > > we
> > > > > > > > > > > > are
> > > > > > > > > > > > >> > > writing
> > > > > > > > > > > > >> > > > > > to).
> > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > >> > > > > > However, the guarantees we need when
> > > writing a
> > > > > > batch
> > > > > > > > > > > consists
> > > > > > > > > > > > >> of:
> > > > > > > > > > > > >> > > > > > Definition of a Batch: The set of
> records
> > > sent
> > > > > to
> > > > > > > the
> > > > > > > > > > > > writeBatch
> > > > > > > > > > > > >> > > > endpoint
> > > > > > > > > > > > >> > > > > > on the proxy
> > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > >> > > > > > 1. Batch success: If the client
> receives a
> > > > > success
> > > > > > > > from
> > > > > > > > > > the
> > > > > > > > > > > > >> proxy,
> > > > > > > > > > > > >> > > then
> > > > > > > > > > > > >> > > > > > that batch is successfully written
> > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > >> > > > > > 2. Inter-Batch ordering : Once a batch
> has
> > > > been
> > > > > > > > written
> > > > > > > > > > > > >> > successfully
> > > > > > > > > > > > >> > > by
> > > > > > > > > > > > >> > > > > the
> > > > > > > > > > > > >> > > > > > client, when another batch is written,
> it
> > > will
> > > > > be
> > > > > > > > > > guaranteed
> > > > > > > > > > > > to
> > > > > > > > > > > > >> be
> > > > > > > > > > > > >> > > > > ordered
> > > > > > > > > > > > >> > > > > > after the last batch (if it is the same
> > > > stream).
> > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > >> > > > > > 3. Intra-Batch ordering: Within a batch
> of
> > > > > writes,
> > > > > > > the
> > > > > > > > > > > records
> > > > > > > > > > > > >> will
> > > > > > > > > > > > >> > > be
> > > > > > > > > > > > >> > > > > > committed in order
> > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > >> > > > > > 4. Intra-Batch failure ordering: If an
> > > > > individual
> > > > > > > > record
> > > > > > > > > > > fails
> > > > > > > > > > > > >> to
> > > > > > > > > > > > >> > > write
> > > > > > > > > > > > >> > > > > > within a batch, all records after that
> > > record
> > > > > will
> > > > > > > not
> > > > > > > > > be
> > > > > > > > > > > > >> written.
> > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > >> > > > > > 5. Batch Commit: Guarantee that if a
> batch
> > > > > > returns a
> > > > > > > > > > > success,
> > > > > > > > > > > > it
> > > > > > > > > > > > >> > will
> > > > > > > > > > > > >> > > > be
> > > > > > > > > > > > >> > > > > > written
> > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > >> > > > > > 6. Read-after-write: Once a batch is
> > > > committed,
> > > > > > > > within a
> > > > > > > > > > > > limited
> > > > > > > > > > > > >> > > > > time-frame
> > > > > > > > > > > > >> > > > > > it will be able to be read. This is
> > required
> > > > in
> > > > > > the
> > > > > > > > case
> > > > > > > > > > of
> > > > > > > > > > > > >> > failure,
> > > > > > > > > > > > >> > > so
> > > > > > > > > > > > >> > > > > > that the client can see what actually
> got
> > > > > > > committed. I
> > > > > > > > > > > believe
> > > > > > > > > > > > >> the
> > > > > > > > > > > > >> > > > > > time-frame part could be removed if the
> > > client
> > > > > can
> > > > > > > > send
> > > > > > > > > in
> > > > > > > > > > > the
> > > > > > > > > > > > >> same
> > > > > > > > > > > > >> > > > > > sequence number that was written
> > previously,
> > > > > since
> > > > > > > it
> > > > > > > > > > would
> > > > > > > > > > > > then
> > > > > > > > > > > > >> > fail
> > > > > > > > > > > > >> > > > and
> > > > > > > > > > > > >> > > > > > we would know that a read needs to
> occur.
> > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > >> > > > > > So, my basic question is if this is
> > > currently
> > > > > > > possible
> > > > > > > > > in
> > > > > > > > > > > the
> > > > > > > > > > > > >> > proxy?
> > > > > > > > > > > > >> > > I
> > > > > > > > > > > > >> > > > > > don't believe it gives these guarantees
> as
> > > it
> > > > > > stands
> > > > > > > > > > today,
> > > > > > > > > > > > but
> > > > > > > > > > > > >> I
> > > > > > > > > > > > >> > am
> > > > > > > > > > > > >> > > > not
> > > > > > > > > > > > >> > > > > > 100% of how all of the futures in the
> code
> > > > > handle
> > > > > > > > > > failures.
> > > > > > > > > > > > >> > > > > > If not, where in the code would be the
> > > > relevant
> > > > > > > places
> > > > > > > > > to
> > > > > > > > > > > add
> > > > > > > > > > > > >> the
> > > > > > > > > > > > >> > > > ability
> > > > > > > > > > > > >> > > > > > to do this, and would the project be
> > > > interested
> > > > > > in a
> > > > > > > > > pull
> > > > > > > > > > > > >> request?
> > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > >> > > > > > Thanks,
> > > > > > > > > > > > >> > > > > > Cameron
> > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >>
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message