distributedlog-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cameron Hatfield <kin...@gmail.com>
Subject Re: Proxy Client - Batch Ordering / Commit
Date Wed, 16 Nov 2016 19:59:30 GMT
"A couple of questions" is what I originally wrote, and then the following
happened. Sorry about the large swath of them, making sure my understanding
of the code base, as well as the DL/Bookkeeper/ZK ecosystem interaction,
makes sense.

==General:
What is an exclusive session? What is it providing over a regular session?


==Proxy side:
Should a new streamop be added for the fencing operation, or does it make
sense to piggyback on an existing one (such as write)?

====getLastDLSN:
What should be the return result for:
A new stream
A new session, after successful fencing
A new session, after a change in ownership / first starting up

What is the main use case for getLastDLSN(<stream>, false)? Is this to
guarantee that the recovery process has happened in case of ownership
failure (I don't have a good understanding of what causes the recovery
process to happen, especially from the reader side)? Or is it to handle the
lost-ack problem? Since all the rest of the read related things go through
the read client, I'm not sure if I see the use case, but it seems like
there would be a large potential for confusion on which to use. What about
just a fenceSession op, that always fences, returning the DLSN of the
fence, and leave the normal getLastDLSN for the regular read client.

====Fencing:
When a fence session occurs, what call needs to be made to make sure any
outstanding writes are flushed and committed (so that we guarantee the
client will be able to read anything that was in the write queue)?
Is there a guaranteed ordering for things written in the future queue for
AsyncLogWriter (I'm not quite confident that I was able to accurately
follow the logic, as their are many parts of the code that write, have
queues, heartbeat, etc)?

====SessionID:
What is the default sessionid / transactionid for a new stream? I assume
this would just be the first control record

======Should all streams have a sessionid by default, regardless if it is
never used by a client (aka, everytime ownership changes, a new control
record is generated, and a sessionid is stored)?
Main edge case that would have to be handled is if a client writes with an
old sessionid, but the owner has changed and has yet to create a sessionid.
This should be handled by the "non-matching sessionid" rule, since the
invalid sessionid wouldn't match the passed sessionid, which should cause
the client to get a new sessionid.

======Where in the code does it make sense to own the session, the stream
interfaces / classes? Should they pass that information down to the ops, or
do the sessionid check within?
My first thought would be Stream owns the sessionid, passes it into the ops
(as either a nullable value, or an invalid default value), which then do
the sessionid check if they care. The main issue is updating the sessionid
is a bit backwards, as either every op has the ability to update it through
some type of return value / direct stream access / etc, or there is a
special case in the stream for the fence operation / any other operation
that can update the session.

======For "the owner of the log stream will first advance the transaction
id generator to claim a new transaction id and write a control record to
the log stream. ":
Should "DistributedLogConstants.CONTROL_RECORD_CONTENT" be the type of
control record written?
Should the "writeControlRecord" on the BKAsyncLogWriter be exposed in the
AsyncLogWriter interface be exposed?  Or even in the one within the segment
writer? Or should the code be duplicated / pulled out into a helper / etc?
(Not a big Java person, so any suggestions on the "Java Way", or at least
the DL way, to do it would be appreciated)

======Transaction ID:
The BKLogSegmentWriter ignores the transaction ids from control records
when it records the "LastTXId." Would that be an issue here for anything?
It looks like it may do that because it assumes you're calling it's local
function for writing a controlrecord, which uses the lastTxId.


==Thrift Interface:
====Should the write response be split out for different calls?
It seems odd to have a single struct with many optional items that are
filled depending on the call made for every rpc call. This is mostly a
curiosity question, since I assume it comes from the general practices from
using thrift for a while. Would it at least make sense for the
getLastDLSN/fence endpoint to have a new struct?

====Any particular error code that makes sense for session fenced? If we
want to be close to the HTTP errors, looks like 412 (PRECONDITION FAILED)
might make the most sense, if a bit generic.

412 def:
"The precondition given in one or more of the request-header fields
evaluated to false when it was tested on the server. This response code
allows the client to place preconditions on the current resource
metainformation (header field data) and thus prevent the requested method
from being applied to a resource other than the one intended."

412 excerpt from the If-match doc:
"This behavior is most useful when the client wants to prevent an updating
method, such as PUT, from modifying a resource that has changed since the
client last retrieved it."

====Should we return the sessionid to the client in the "fencesession"
calls?
Seems like it may be useful when you fence, especially if you have some
type of custom sequencer where it would make sense for search, or for
debugging.
Main minus is that it would be easy for users to create an implicit
requirement that the sessionid is forever a valid transactionid, which may
not always be the case long term for the project.


==Client:
====What is the proposed process for the client retrieving the new
sessionid?
A full reconnect? No special case code, but intrusive on the client side,
and possibly expensive garbage/processing wise. (though this type of
failure should hopefully be rare enough to not be a problem)
A call to reset the sessionid? Less intrusive, all the issues you get with
mutable object methods that need to be called in a certain order, edge
cases such as outstanding/buffered requests to the old stream, etc.
The call could also return the new sessionid, making it a good call for
storing or debugging the value.

====Session Fenced failure:
Will this put the client into a failure state, stopping all future writes
until fixed?
Is it even possible to get this error when ownership changes? The
connection to the new owner should get a new sessionid on connect, so I
would expect not.

Cheers,
Cameron

On Tue, Nov 15, 2016 at 2:01 AM, Xi Liu <xi.liu.ant@gmail.com> wrote:

> Thank you, Cameron. Look forward to your comments.
>
> - Xi
>
> On Sun, Nov 13, 2016 at 1:21 PM, Cameron Hatfield <kinguy@gmail.com>
> wrote:
>
> > Sorry, I've been on vacation for the past week, and heads down for a
> > release that is using DL at the end of Nov. I'll take a look at this over
> > the next week, and add any relevant comments. After we are finished with
> > dev for this release, I am hoping to tackle this next.
> >
> > -Cameron
> >
> > On Fri, Nov 11, 2016 at 12:07 PM, Sijie Guo <sijie@apache.org> wrote:
> >
> > > Xi,
> > >
> > > Thank you so much for your proposal. I took a look. It looks fine to
> me.
> > > Cameron, do you have any comments?
> > >
> > > Look forward to your pull requests.
> > >
> > > - Sijie
> > >
> > >
> > > On Wed, Nov 9, 2016 at 2:34 AM, Xi Liu <xi.liu.ant@gmail.com> wrote:
> > >
> > > > Cameron,
> > > >
> > > > Have you started any work for this? I just updated the proposal page
> -
> > > > https://cwiki.apache.org/confluence/display/DL/DP-2+-+
> > > Epoch+Write+Support
> > > > Maybe we can work together with this.
> > > >
> > > > Sijie, Leigh,
> > > >
> > > > can you guys help review this to make sure our proposal is in the
> right
> > > > direction?
> > > >
> > > > - Xi
> > > >
> > > > On Tue, Nov 1, 2016 at 3:05 AM, Sijie Guo <sijie@apache.org> wrote:
> > > >
> > > > > I created https://issues.apache.org/jira/browse/DL-63 for tracking
> > the
> > > > > proposed idea here.
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Oct 26, 2016 at 4:53 PM, Sijie Guo
> > <sijieg@twitter.com.invalid
> > > >
> > > > > wrote:
> > > > >
> > > > > > On Tue, Oct 25, 2016 at 11:30 AM, Cameron Hatfield <
> > kinguy@gmail.com
> > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Yes, we are reading the HBase WAL (from their replication
> plugin
> > > > > > support),
> > > > > > > and writing that into DL.
> > > > > > >
> > > > > >
> > > > > > Gotcha.
> > > > > >
> > > > > >
> > > > > > >
> > > > > > > From the sounds of it, yes, it would. Only thing I would say is
> > > make
> > > > > the
> > > > > > > epoch requirement optional, so that if I client doesn't care
> > about
> > > > > dupes
> > > > > > > they don't have to deal with the process of getting a new
> epoch.
> > > > > > >
> > > > > >
> > > > > > Yup. This should be optional. I can start a wiki page on how we
> > want
> > > to
> > > > > > implement this. Are you interested in contributing to this?
> > > > > >
> > > > > >
> > > > > > >
> > > > > > > -Cameron
> > > > > > >
> > > > > > > On Wed, Oct 19, 2016 at 7:43 PM, Sijie Guo
> > > > <sijieg@twitter.com.invalid
> > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > On Wed, Oct 19, 2016 at 7:17 PM, Sijie Guo <
> sijieg@twitter.com
> > >
> > > > > wrote:
> > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Monday, October 17, 2016, Cameron Hatfield <
> > > kinguy@gmail.com>
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > >> Answer inline:
> > > > > > > > >>
> > > > > > > > >> On Mon, Oct 17, 2016 at 11:46 AM, Sijie Guo <
> > sijie@apache.org
> > > >
> > > > > > wrote:
> > > > > > > > >>
> > > > > > > > >> > Cameron,
> > > > > > > > >> >
> > > > > > > > >> > Thank you for your summary. I liked the discussion
> here. I
> > > > also
> > > > > > > liked
> > > > > > > > >> the
> > > > > > > > >> > summary of your requirement - 'single-writer-per-key,
> > > > > > > > >> > multiple-writers-per-log'. If I understand correctly,
> the
> > > core
> > > > > > > concern
> > > > > > > > >> here
> > > > > > > > >> > is almost 'exact-once' write (or a way to explicit tell
> > if a
> > > > > write
> > > > > > > can
> > > > > > > > >> be
> > > > > > > > >> > retried or not).
> > > > > > > > >> >
> > > > > > > > >> > Comments inline.
> > > > > > > > >> >
> > > > > > > > >> > On Fri, Oct 14, 2016 at 11:17 AM, Cameron Hatfield <
> > > > > > > kinguy@gmail.com>
> > > > > > > > >> > wrote:
> > > > > > > > >> >
> > > > > > > > >> > > > Ah- yes good point (to be clear we're not using the
> > > proxy
> > > > > this
> > > > > > > way
> > > > > > > > >> > > today).
> > > > > > > > >> > >
> > > > > > > > >> > > > > Due to the source of the
> > > > > > > > >> > > > > data (HBase Replication), we cannot guarantee
> that a
> > > > > single
> > > > > > > > >> partition
> > > > > > > > >> > > will
> > > > > > > > >> > > > > be owned for writes by the same client.
> > > > > > > > >> > >
> > > > > > > > >> > > > Do you mean you *need* to support multiple writers
> > > issuing
> > > > > > > > >> interleaved
> > > > > > > > >> > > > writes or is it just that they might sometimes
> > > interleave
> > > > > > writes
> > > > > > > > and
> > > > > > > > >> > you
> > > > > > > > >> > > >don't care?
> > > > > > > > >> > > How HBase partitions the keys being written wouldn't
> > have
> > > a
> > > > > > > one->one
> > > > > > > > >> > > mapping with the partitions we would have in HBase.
> Even
> > > if
> > > > we
> > > > > > did
> > > > > > > > >> have
> > > > > > > > >> > > that alignment when the cluster first started, HBase
> > will
> > > > > > > rebalance
> > > > > > > > >> what
> > > > > > > > >> > > servers own what partitions, as well as split and
> merge
> > > > > > partitions
> > > > > > > > >> that
> > > > > > > > >> > > already exist, causing eventual drift from one log per
> > > > > > partition.
> > > > > > > > >> > > Because we want ordering guarantees per key (row in
> > > hbase),
> > > > we
> > > > > > > > >> partition
> > > > > > > > >> > > the logs by the key. Since multiple writers are
> possible
> > > per
> > > > > > range
> > > > > > > > of
> > > > > > > > >> > keys
> > > > > > > > >> > > (due to the aforementioned rebalancing / splitting /
> etc
> > > of
> > > > > > > hbase),
> > > > > > > > we
> > > > > > > > >> > > cannot use the core library due to requiring a single
> > > writer
> > > > > for
> > > > > > > > >> > ordering.
> > > > > > > > >> > >
> > > > > > > > >> > > But, for a single log, we don't really care about
> > ordering
> > > > > aside
> > > > > > > > from
> > > > > > > > >> at
> > > > > > > > >> > > the per-key level. So all we really need to be able to
> > > > handle
> > > > > is
> > > > > > > > >> > preventing
> > > > > > > > >> > > duplicates when a failure occurs, and ordering
> > consistency
> > > > > > across
> > > > > > > > >> > requests
> > > > > > > > >> > > from a single client.
> > > > > > > > >> > >
> > > > > > > > >> > > So our general requirements are:
> > > > > > > > >> > > Write A, Write B
> > > > > > > > >> > > Timeline: A -> B
> > > > > > > > >> > > Request B is only made after A has successfully
> returned
> > > > > > (possibly
> > > > > > > > >> after
> > > > > > > > >> > > retries)
> > > > > > > > >> > >
> > > > > > > > >> > > 1) If the write succeeds, it will be durably exposed
> to
> > > > > clients
> > > > > > > > within
> > > > > > > > >> > some
> > > > > > > > >> > > bounded time frame
> > > > > > > > >> > >
> > > > > > > > >> >
> > > > > > > > >> > Guaranteed.
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > > >> > > 2) If A succeeds and B succeeds, the ordering for the
> > log
> > > > will
> > > > > > be
> > > > > > > A
> > > > > > > > >> and
> > > > > > > > >> > > then B
> > > > > > > > >> > >
> > > > > > > > >> >
> > > > > > > > >> > If I understand correctly here, B is only sent after A
> is
> > > > > > returned,
> > > > > > > > >> right?
> > > > > > > > >> > If that's the case, It is guaranteed.
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > > 3) If A fails due to an error that can be relied on to
> > > *not*
> > > > > be
> > > > > > a
> > > > > > > > lost
> > > > > > > > >> > ack
> > > > > > > > >> > > problem, it will never be exposed to the client, so it
> > may
> > > > > > > > (depending
> > > > > > > > >> on
> > > > > > > > >> > > the error) be retried immediately
> > > > > > > > >> > >
> > > > > > > > >> >
> > > > > > > > >> > If it is not a lost-ack problem, the entry will be
> > exposed.
> > > it
> > > > > is
> > > > > > > > >> > guaranteed.
> > > > > > > > >>
> > > > > > > > >> Let me try rephrasing the questions, to make sure I'm
> > > > > understanding
> > > > > > > > >> correctly:
> > > > > > > > >> If A fails, with an error such as "Unable to create
> > connection
> > > > to
> > > > > > > > >> bookkeeper server", that would be the type of error we
> would
> > > > > expect
> > > > > > to
> > > > > > > > be
> > > > > > > > >> able to retry immediately, as that result means no action
> > was
> > > > > taken
> > > > > > on
> > > > > > > > any
> > > > > > > > >> log / etc, so no entry could have been created. This is
> > > > different
> > > > > > > then a
> > > > > > > > >> "Connection Timeout" exception, as we just might not have
> > > > gotten a
> > > > > > > > >> response
> > > > > > > > >> in time.
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > > Gotcha.
> > > > > > > > >
> > > > > > > > > The response code returned from proxy can tell if a failure
> > can
> > > > be
> > > > > > > > retried
> > > > > > > > > safely or not. (We might need to make them well documented)
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >>
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > > 4) If A fails due to an error that could be a lost ack
> > > > problem
> > > > > > > > >> (network
> > > > > > > > >> > > connectivity / etc), within a bounded time frame it
> > should
> > > > be
> > > > > > > > >> possible to
> > > > > > > > >> > > find out if the write succeed or failed. Either by
> > reading
> > > > > from
> > > > > > > some
> > > > > > > > >> > > checkpoint of the log for the changes that should have
> > > been
> > > > > made
> > > > > > > or
> > > > > > > > >> some
> > > > > > > > >> > > other possible server-side support.
> > > > > > > > >> > >
> > > > > > > > >> >
> > > > > > > > >> > If I understand this correctly, it is a duplication
> issue,
> > > > > right?
> > > > > > > > >> >
> > > > > > > > >> > Can a de-duplication solution work here? Either DL or
> your
> > > > > client
> > > > > > > does
> > > > > > > > >> the
> > > > > > > > >> > de-duplication?
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > > >> The requirements I'm mentioning are the ones needed for
> > > > > client-side
> > > > > > > > >> dedupping. Since if I can guarantee writes being exposed
> > > within
> > > > > some
> > > > > > > > time
> > > > > > > > >> frame, and I can never get into an inconsistently ordered
> > > state
> > > > > when
> > > > > > > > >> successes happen, when an error occurs, I can always wait
> > for
> > > > max
> > > > > > time
> > > > > > > > >> frame, read the latest writes, and then dedup locally
> > against
> > > > the
> > > > > > > > request
> > > > > > > > >> I
> > > > > > > > >> just made.
> > > > > > > > >>
> > > > > > > > >> The main thing about that timeframe is that its basically
> > the
> > > > > > addition
> > > > > > > > of
> > > > > > > > >> every timeout, all the way down in the system, combined
> with
> > > > > > whatever
> > > > > > > > >> flushing / caching / etc times are at the bookkeeper /
> > client
> > > > > level
> > > > > > > for
> > > > > > > > >> when values are exposed
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Gotcha.
> > > > > > > > >
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >> >
> > > > > > > > >> > Is there any ways to identify your write?
> > > > > > > > >> >
> > > > > > > > >> > I can think of a case as follow - I want to know what is
> > > your
> > > > > > > expected
> > > > > > > > >> > behavior from the log.
> > > > > > > > >> >
> > > > > > > > >> > a)
> > > > > > > > >> >
> > > > > > > > >> > If a hbase region server A writes a change of key K to
> the
> > > > log,
> > > > > > the
> > > > > > > > >> change
> > > > > > > > >> > is successfully made to the log;
> > > > > > > > >> > but server A is down before receiving the change.
> > > > > > > > >> > region server B took over the region that contains K,
> what
> > > > will
> > > > > B
> > > > > > > do?
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > > >> HBase writes in large chunks (WAL Logs), which its
> > replication
> > > > > > system
> > > > > > > > then
> > > > > > > > >> handles by replaying in the case of failure. If I'm in a
> > > middle
> > > > > of a
> > > > > > > > log,
> > > > > > > > >> and the whole region goes down and gets rescheduled
> > > elsewhere, I
> > > > > > will
> > > > > > > > >> start
> > > > > > > > >> back up from the beginning of the log I was in the middle
> > of.
> > > > > Using
> > > > > > > > >> checkpointing + deduping, we should be able to find out
> > where
> > > we
> > > > > > left
> > > > > > > > off
> > > > > > > > >> in the log.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > b) same as a). but server A was just network
> partitioned.
> > > will
> > > > > > both
> > > > > > > A
> > > > > > > > >> and B
> > > > > > > > >> > write the change of key K?
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > > >> HBase gives us some guarantees around network partitions
> > > > > > (Consistency
> > > > > > > > over
> > > > > > > > >> availability for HBase). HBase is a single-master failover
> > > > > recovery
> > > > > > > type
> > > > > > > > >> of
> > > > > > > > >> system, with zookeeper-based guarantees for single owners
> > > > > (writers)
> > > > > > > of a
> > > > > > > > >> range of data.
> > > > > > > > >>
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > > 5) If A is turned into multiple batches (one large
> > request
> > > > > gets
> > > > > > > > split
> > > > > > > > >> > into
> > > > > > > > >> > > multiple smaller ones to the bookkeeper backend, due
> to
> > > log
> > > > > > > rolling
> > > > > > > > /
> > > > > > > > >> > size
> > > > > > > > >> > > / etc):
> > > > > > > > >> > >   a) The ordering of entries within batches have
> > ordering
> > > > > > > > consistence
> > > > > > > > >> > with
> > > > > > > > >> > > the original request, when exposed in the log (though
> > they
> > > > may
> > > > > > be
> > > > > > > > >> > > interleaved with other requests)
> > > > > > > > >> > >   b) The ordering across batches have ordering
> > consistence
> > > > > with
> > > > > > > the
> > > > > > > > >> > > original request, when exposed in the log (though they
> > may
> > > > be
> > > > > > > > >> interleaved
> > > > > > > > >> > > with other requests)
> > > > > > > > >> > >   c) If a batch fails, and cannot be retried / is
> > > > > unsuccessfully
> > > > > > > > >> retried,
> > > > > > > > >> > > all batches after the failed batch should not be
> exposed
> > > in
> > > > > the
> > > > > > > log.
> > > > > > > > >> > Note:
> > > > > > > > >> > > The batches before and including the failed batch,
> that
> > > > ended
> > > > > up
> > > > > > > > >> > > succeeding, can show up in the log, again within some
> > > > bounded
> > > > > > time
> > > > > > > > >> range
> > > > > > > > >> > > for reads by a client.
> > > > > > > > >> > >
> > > > > > > > >> >
> > > > > > > > >> > There is a method 'writeBulk' in DistributedLogClient
> can
> > > > > achieve
> > > > > > > this
> > > > > > > > >> > guarantee.
> > > > > > > > >> >
> > > > > > > > >> > However, I am not very sure about how will you turn A
> into
> > > > > > batches.
> > > > > > > If
> > > > > > > > >> you
> > > > > > > > >> > are dividing A into batches,
> > > > > > > > >> > you can simply control the application write sequence to
> > > > achieve
> > > > > > the
> > > > > > > > >> > guarantee here.
> > > > > > > > >> >
> > > > > > > > >> > Can you explain more about this?
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > > >> In this case, by batches I mean what the proxy does with
> the
> > > > > single
> > > > > > > > >> request
> > > > > > > > >> that I send it. If the proxy decides it needs to turn my
> > > single
> > > > > > > request
> > > > > > > > >> into multiple batches of requests, due to log rolling,
> size
> > > > > > > limitations,
> > > > > > > > >> etc, those would be the guarantees I need to be able to
> > > > > reduplicate
> > > > > > on
> > > > > > > > the
> > > > > > > > >> client side.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > A single record written by #write and A record set (set of
> > > > records)
> > > > > > > > > written by #writeRecordSet are atomic - they will not be
> > broken
> > > > > down
> > > > > > > into
> > > > > > > > > entries (batches). With the correct response code, you
> would
> > be
> > > > > able
> > > > > > to
> > > > > > > > > tell if it is a lost-ack failure or not. However there is a
> > > size
> > > > > > > > limitation
> > > > > > > > > for this - it can't not go beyond 1MB for current
> > > implementation.
> > > > > > > > >
> > > > > > > > > What is your expected record size?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >>
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > >
> > > > > > > > >> > > Since we can guarantee per-key ordering on the client
> > > side,
> > > > we
> > > > > > > > >> guarantee
> > > > > > > > >> > > that there is a single writer per-key, just not per
> log.
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > Do you need fencing guarantee in the case of network
> > > partition
> > > > > > > causing
> > > > > > > > >> > two-writers?
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > > So if there was a
> > > > > > > > >> > > way to guarantee a single write request as being
> written
> > > or
> > > > > not,
> > > > > > > > >> within a
> > > > > > > > >> > > certain time frame (since failures should be rare
> > anyways,
> > > > > this
> > > > > > is
> > > > > > > > >> fine
> > > > > > > > >> > if
> > > > > > > > >> > > it is expensive), we can then have the client
> guarantee
> > > the
> > > > > > > ordering
> > > > > > > > >> it
> > > > > > > > >> > > needs.
> > > > > > > > >> > >
> > > > > > > > >> >
> > > > > > > > >> > This sounds an 'exact-once' write (regarding retries)
> > > > > requirement
> > > > > > to
> > > > > > > > me,
> > > > > > > > >> > right?
> > > > > > > > >> >
> > > > > > > > >> Yes. I'm curious of how this issue is handled by
> Manhattan,
> > > > since
> > > > > > you
> > > > > > > > can
> > > > > > > > >> imagine a data store that ends up getting multiple writes
> > for
> > > > the
> > > > > > same
> > > > > > > > put
> > > > > > > > >> / get / etc, would be harder to use, and we are basically
> > > trying
> > > > > to
> > > > > > > > create
> > > > > > > > >> a log like that for HBase.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Are you guys replacing HBase WAL?
> > > > > > > > >
> > > > > > > > > In Manhattan case, the request will be first written to DL
> > > > streams
> > > > > by
> > > > > > > > > Manhattan coordinator. The Manhattan replica then will read
> > > from
> > > > > the
> > > > > > DL
> > > > > > > > > streams and apply the change. In the lost-ack case, the MH
> > > > > > coordinator
> > > > > > > > will
> > > > > > > > > just fail the request to client.
> > > > > > > > >
> > > > > > > > > My feeling here is your usage for HBase is a bit different
> > from
> > > > how
> > > > > > we
> > > > > > > > use
> > > > > > > > > DL in Manhattan. It sounds like you read from a source
> (HBase
> > > > WAL)
> > > > > > and
> > > > > > > > > write to DL. But I might be wrong.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >>
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > >
> > > > > > > > >> > >
> > > > > > > > >> > > > Cameron:
> > > > > > > > >> > > > Another thing we've discussed but haven't really
> > thought
> > > > > > > through -
> > > > > > > > >> > > > We might be able to support some kind of epoch write
> > > > > request,
> > > > > > > > where
> > > > > > > > >> the
> > > > > > > > >> > > > epoch is guaranteed to have changed if the writer
> has
> > > > > changed
> > > > > > or
> > > > > > > > the
> > > > > > > > >> > > ledger
> > > > > > > > >> > > > was ever fenced off. Writes include an epoch and are
> > > > > rejected
> > > > > > if
> > > > > > > > the
> > > > > > > > >> > > epoch
> > > > > > > > >> > > > has changed.
> > > > > > > > >> > > > With a mechanism like this, fencing the ledger off
> > > after a
> > > > > > > failure
> > > > > > > > >> > would
> > > > > > > > >> > > > ensure any pending writes had either been written or
> > > would
> > > > > be
> > > > > > > > >> rejected.
> > > > > > > > >> > >
> > > > > > > > >> > > The issue would be how I guarantee the write I wrote
> to
> > > the
> > > > > > server
> > > > > > > > was
> > > > > > > > >> > > written. Since a network issue could happen on the
> send
> > of
> > > > the
> > > > > > > > >> request,
> > > > > > > > >> > or
> > > > > > > > >> > > on the receive of the success response, an epoch
> > wouldn't
> > > > tell
> > > > > > me
> > > > > > > > if I
> > > > > > > > >> > can
> > > > > > > > >> > > successfully retry, as it could be successfully
> written
> > > but
> > > > > AWS
> > > > > > > > >> dropped
> > > > > > > > >> > the
> > > > > > > > >> > > connection for the success response. Since the epoch
> > would
> > > > be
> > > > > > the
> > > > > > > > same
> > > > > > > > >> > > (same ledger), I could write duplicates.
> > > > > > > > >> > >
> > > > > > > > >> > >
> > > > > > > > >> > > > We are currently proposing adding a transaction
> > semantic
> > > > to
> > > > > dl
> > > > > > > to
> > > > > > > > >> get
> > > > > > > > >> > rid
> > > > > > > > >> > > > of the size limitation and the unaware-ness in the
> > proxy
> > > > > > client.
> > > > > > > > >> Here
> > > > > > > > >> > is
> > > > > > > > >> > > > our idea -
> > > > > > > > >> > > > http://mail-archives.apache.org/mod_mbox/incubator-
> > > > > > > distributedlog
> > > > > > > > >> > > -dev/201609.mbox/%3cCAAC6BxP5YyEHwG0ZCF5soh42X=xuYwYm
> > > > > > > > >> > > <http://mail-archives.apache.org/mod_mbox/incubator-
> > > > > > > > >> > distributedlog%0A-dev/201609.mbox/%
> > > 3cCAAC6BxP5YyEHwG0ZCF5soh
> > > > > > > > 42X=xuYwYm>
> > > > > > > > >> > > L4nXsYBYiofzxpVk6g@mail.gmail.com%3e
> > > > > > > > >> > >
> > > > > > > > >> > > > I am not sure if your idea is similar as ours. but
> > we'd
> > > > like
> > > > > > to
> > > > > > > > >> > > collaborate
> > > > > > > > >> > > > with the community if anyone has the similar idea.
> > > > > > > > >> > >
> > > > > > > > >> > > Our use case would be covered by transaction support,
> > but
> > > > I'm
> > > > > > > unsure
> > > > > > > > >> if
> > > > > > > > >> > we
> > > > > > > > >> > > would need something that heavy weight for the
> > guarantees
> > > we
> > > > > > need.
> > > > > > > > >> > >
> > > > > > > > >> >
> > > > > > > > >> > >
> > > > > > > > >> > > Basically, the high level requirement here is "Support
> > > > > > consistent
> > > > > > > > >> write
> > > > > > > > >> > > ordering for single-writer-per-key,
> > multi-writer-per-log".
> > > > My
> > > > > > > hunch
> > > > > > > > is
> > > > > > > > >> > > that, with some added guarantees to the proxy (if it
> > isn't
> > > > > > already
> > > > > > > > >> > > supported), and some custom client code on our side
> for
> > > > > removing
> > > > > > > the
> > > > > > > > >> > > entries that actually succeed to write to
> DistributedLog
> > > > from
> > > > > > the
> > > > > > > > >> request
> > > > > > > > >> > > that failed, it should be a relatively easy thing to
> > > > support.
> > > > > > > > >> > >
> > > > > > > > >> >
> > > > > > > > >> > Yup. I think it should not be very difficult to support.
> > > There
> > > > > > might
> > > > > > > > be
> > > > > > > > >> > some changes in the server side.
> > > > > > > > >> > Let's figure out what will the changes be. Are you guys
> > > > > interested
> > > > > > > in
> > > > > > > > >> > contributing?
> > > > > > > > >> >
> > > > > > > > >> > Yes, we would be.
> > > > > > > > >>
> > > > > > > > >> As a note, the one thing that we see as an issue with the
> > > client
> > > > > > side
> > > > > > > > >> dedupping is how to bound the range of data that needs to
> be
> > > > > looked
> > > > > > at
> > > > > > > > for
> > > > > > > > >> deduplication. As you can imagine, it is pretty easy to
> > bound
> > > > the
> > > > > > > bottom
> > > > > > > > >> of
> > > > > > > > >> the range, as that it just regular checkpointing of the
> DSLN
> > > > that
> > > > > is
> > > > > > > > >> returned. I'm still not sure if there is any nice way to
> > time
> > > > > bound
> > > > > > > the
> > > > > > > > >> top
> > > > > > > > >> end of the range, especially since the proxy owns sequence
> > > > numbers
> > > > > > > > (which
> > > > > > > > >> makes sense). I am curious if there is more that can be
> done
> > > if
> > > > > > > > >> deduplication is on the server side. However the main
> minus
> > I
> > > > see
> > > > > of
> > > > > > > > >> server
> > > > > > > > >> side deduplication is that instead of running contingent
> on
> > > > there
> > > > > > > being
> > > > > > > > a
> > > > > > > > >> failed client request, instead it would have to run every
> > > time a
> > > > > > write
> > > > > > > > >> happens.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > For a reliable dedup, we probably need
> fence-then-getLastDLSN
> > > > > > > operation -
> > > > > > > > > so it would guarantee that any non-completed requests
> issued
> > > > > > (lost-ack
> > > > > > > > > requests) before this fence-then-getLastDLSN operation will
> > be
> > > > > failed
> > > > > > > and
> > > > > > > > > they will never land at the log.
> > > > > > > > >
> > > > > > > > > the pseudo code would look like below -
> > > > > > > > >
> > > > > > > > > write(request) onFailure { t =>
> > > > > > > > >
> > > > > > > > > if (t is timeout exception) {
> > > > > > > > >
> > > > > > > > > DLSN lastDLSN = fenceThenGetLastDLSN()
> > > > > > > > > DLSN lastCheckpointedDLSN = ...;
> > > > > > > > > // find if the request lands between [lastDLSN,
> > > > > > lastCheckpointedDLSN].
> > > > > > > > > // if it exists, the write succeed; otherwise retry.
> > > > > > > > >
> > > > > > > > > }
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > }
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > Just realized the idea is same as what Leigh raised in the
> > > previous
> > > > > > email
> > > > > > > > about 'epoch write'. Let me explain more about this idea
> > (Leigh,
> > > > feel
> > > > > > > free
> > > > > > > > to jump in to fill up your idea).
> > > > > > > >
> > > > > > > > - when a log stream is owned,  the proxy use the last
> > transaction
> > > > id
> > > > > as
> > > > > > > the
> > > > > > > > epoch
> > > > > > > > - when a client connects (handshake with the proxy), it will
> > get
> > > > the
> > > > > > > epoch
> > > > > > > > for the stream.
> > > > > > > > - the writes issued by this client will carry the epoch to
> the
> > > > proxy.
> > > > > > > > - add a new rpc - fenceThenGetLastDLSN - it would force the
> > proxy
> > > > to
> > > > > > bump
> > > > > > > > the epoch.
> > > > > > > > - if fenceThenGetLastDLSN happened, all the outstanding
> writes
> > > with
> > > > > old
> > > > > > > > epoch will be rejected with exceptions (e.g. EpochFenced).
> > > > > > > > - The DLSN returned from fenceThenGetLastDLSN can be used as
> > the
> > > > > bound
> > > > > > > for
> > > > > > > > deduplications on failures.
> > > > > > > >
> > > > > > > > Cameron, does this sound a solution to your use case?
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >>
> > > > > > > > >> Maybe something that could fit a similar need that Kafka
> > does
> > > > (the
> > > > > > > last
> > > > > > > > >> store value for a particular key in a log), such that on a
> > per
> > > > key
> > > > > > > basis
> > > > > > > > >> there could be a sequence number that support
> deduplication?
> > > > Cost
> > > > > > > seems
> > > > > > > > >> like it would be high however, and I'm not even sure if
> > > > bookkeeper
> > > > > > > > >> supports
> > > > > > > > >> it.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >> Cheers,
> > > > > > > > >> Cameron
> > > > > > > > >>
> > > > > > > > >> >
> > > > > > > > >> > >
> > > > > > > > >> > > Thanks,
> > > > > > > > >> > > Cameron
> > > > > > > > >> > >
> > > > > > > > >> > >
> > > > > > > > >> > > On Sat, Oct 8, 2016 at 7:35 AM, Leigh Stewart
> > > > > > > > >> > <lstewart@twitter.com.invalid
> > > > > > > > >> > > >
> > > > > > > > >> > > wrote:
> > > > > > > > >> > >
> > > > > > > > >> > > > Cameron:
> > > > > > > > >> > > > Another thing we've discussed but haven't really
> > thought
> > > > > > > through -
> > > > > > > > >> > > > We might be able to support some kind of epoch write
> > > > > request,
> > > > > > > > where
> > > > > > > > >> the
> > > > > > > > >> > > > epoch is guaranteed to have changed if the writer
> has
> > > > > changed
> > > > > > or
> > > > > > > > the
> > > > > > > > >> > > ledger
> > > > > > > > >> > > > was ever fenced off. Writes include an epoch and are
> > > > > rejected
> > > > > > if
> > > > > > > > the
> > > > > > > > >> > > epoch
> > > > > > > > >> > > > has changed.
> > > > > > > > >> > > > With a mechanism like this, fencing the ledger off
> > > after a
> > > > > > > failure
> > > > > > > > >> > would
> > > > > > > > >> > > > ensure any pending writes had either been written or
> > > would
> > > > > be
> > > > > > > > >> rejected.
> > > > > > > > >> > > >
> > > > > > > > >> > > >
> > > > > > > > >> > > > On Sat, Oct 8, 2016 at 7:10 AM, Sijie Guo <
> > > > sijie@apache.org
> > > > > >
> > > > > > > > wrote:
> > > > > > > > >> > > >
> > > > > > > > >> > > > > Cameron,
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > I think both Leigh and Xi had made a few good
> points
> > > > about
> > > > > > > your
> > > > > > > > >> > > question.
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > To add one more point to your question - "but I am
> > not
> > > > > > > > >> > > > > 100% of how all of the futures in the code handle
> > > > > failures.
> > > > > > > > >> > > > > If not, where in the code would be the relevant
> > places
> > > > to
> > > > > > add
> > > > > > > > the
> > > > > > > > >> > > ability
> > > > > > > > >> > > > > to do this, and would the project be interested
> in a
> > > > pull
> > > > > > > > >> request?"
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > The current proxy and client logic doesn't do
> > > perfectly
> > > > on
> > > > > > > > >> handling
> > > > > > > > >> > > > > failures (duplicates) - the strategy now is the
> > client
> > > > > will
> > > > > > > > retry
> > > > > > > > >> as
> > > > > > > > >> > > best
> > > > > > > > >> > > > > at it can before throwing exceptions to users. The
> > > code
> > > > > you
> > > > > > > are
> > > > > > > > >> > looking
> > > > > > > > >> > > > for
> > > > > > > > >> > > > > - it is on BKLogSegmentWriter for the proxy
> handling
> > > > > writes
> > > > > > > and
> > > > > > > > >> it is
> > > > > > > > >> > > on
> > > > > > > > >> > > > > DistributedLogClientImpl for the proxy client
> > handling
> > > > > > > responses
> > > > > > > > >> from
> > > > > > > > >> > > > > proxies. Does this help you?
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > And also, you are welcome to contribute the pull
> > > > requests.
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > - Sijie
> > > > > > > > >> > > > >
> > > > > > > > >> > > > >
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > On Tue, Oct 4, 2016 at 3:39 PM, Cameron Hatfield <
> > > > > > > > >> kinguy@gmail.com>
> > > > > > > > >> > > > wrote:
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > > I have a question about the Proxy Client.
> > Basically,
> > > > for
> > > > > > our
> > > > > > > > use
> > > > > > > > >> > > cases,
> > > > > > > > >> > > > > we
> > > > > > > > >> > > > > > want to guarantee ordering at the key level,
> > > > > irrespective
> > > > > > of
> > > > > > > > the
> > > > > > > > >> > > > ordering
> > > > > > > > >> > > > > > of the partition it may be assigned to as a
> whole.
> > > Due
> > > > > to
> > > > > > > the
> > > > > > > > >> > source
> > > > > > > > >> > > of
> > > > > > > > >> > > > > the
> > > > > > > > >> > > > > > data (HBase Replication), we cannot guarantee
> > that a
> > > > > > single
> > > > > > > > >> > partition
> > > > > > > > >> > > > > will
> > > > > > > > >> > > > > > be owned for writes by the same client. This
> means
> > > the
> > > > > > proxy
> > > > > > > > >> client
> > > > > > > > >> > > > works
> > > > > > > > >> > > > > > well (since we don't care which proxy owns the
> > > > partition
> > > > > > we
> > > > > > > > are
> > > > > > > > >> > > writing
> > > > > > > > >> > > > > > to).
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > However, the guarantees we need when writing a
> > batch
> > > > > > > consists
> > > > > > > > >> of:
> > > > > > > > >> > > > > > Definition of a Batch: The set of records sent
> to
> > > the
> > > > > > > > writeBatch
> > > > > > > > >> > > > endpoint
> > > > > > > > >> > > > > > on the proxy
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > 1. Batch success: If the client receives a
> success
> > > > from
> > > > > > the
> > > > > > > > >> proxy,
> > > > > > > > >> > > then
> > > > > > > > >> > > > > > that batch is successfully written
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > 2. Inter-Batch ordering : Once a batch has been
> > > > written
> > > > > > > > >> > successfully
> > > > > > > > >> > > by
> > > > > > > > >> > > > > the
> > > > > > > > >> > > > > > client, when another batch is written, it will
> be
> > > > > > guaranteed
> > > > > > > > to
> > > > > > > > >> be
> > > > > > > > >> > > > > ordered
> > > > > > > > >> > > > > > after the last batch (if it is the same stream).
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > 3. Intra-Batch ordering: Within a batch of
> writes,
> > > the
> > > > > > > records
> > > > > > > > >> will
> > > > > > > > >> > > be
> > > > > > > > >> > > > > > committed in order
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > 4. Intra-Batch failure ordering: If an
> individual
> > > > record
> > > > > > > fails
> > > > > > > > >> to
> > > > > > > > >> > > write
> > > > > > > > >> > > > > > within a batch, all records after that record
> will
> > > not
> > > > > be
> > > > > > > > >> written.
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > 5. Batch Commit: Guarantee that if a batch
> > returns a
> > > > > > > success,
> > > > > > > > it
> > > > > > > > >> > will
> > > > > > > > >> > > > be
> > > > > > > > >> > > > > > written
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > 6. Read-after-write: Once a batch is committed,
> > > > within a
> > > > > > > > limited
> > > > > > > > >> > > > > time-frame
> > > > > > > > >> > > > > > it will be able to be read. This is required in
> > the
> > > > case
> > > > > > of
> > > > > > > > >> > failure,
> > > > > > > > >> > > so
> > > > > > > > >> > > > > > that the client can see what actually got
> > > committed. I
> > > > > > > believe
> > > > > > > > >> the
> > > > > > > > >> > > > > > time-frame part could be removed if the client
> can
> > > > send
> > > > > in
> > > > > > > the
> > > > > > > > >> same
> > > > > > > > >> > > > > > sequence number that was written previously,
> since
> > > it
> > > > > > would
> > > > > > > > then
> > > > > > > > >> > fail
> > > > > > > > >> > > > and
> > > > > > > > >> > > > > > we would know that a read needs to occur.
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > So, my basic question is if this is currently
> > > possible
> > > > > in
> > > > > > > the
> > > > > > > > >> > proxy?
> > > > > > > > >> > > I
> > > > > > > > >> > > > > > don't believe it gives these guarantees as it
> > stands
> > > > > > today,
> > > > > > > > but
> > > > > > > > >> I
> > > > > > > > >> > am
> > > > > > > > >> > > > not
> > > > > > > > >> > > > > > 100% of how all of the futures in the code
> handle
> > > > > > failures.
> > > > > > > > >> > > > > > If not, where in the code would be the relevant
> > > places
> > > > > to
> > > > > > > add
> > > > > > > > >> the
> > > > > > > > >> > > > ability
> > > > > > > > >> > > > > > to do this, and would the project be interested
> > in a
> > > > > pull
> > > > > > > > >> request?
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > Thanks,
> > > > > > > > >> > > > > > Cameron
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > >
> > > > > > > > >> > > >
> > > > > > > > >> > >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message