samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jef G <j...@dataminr.com>
Subject Re: buffering records for join
Date Fri, 13 Oct 2017 00:09:10 GMT
Yi, thank you for your detailed and informative replies.

-Jef

On Thu, Oct 12, 2017 at 5:49 PM, Yi Pan <nickpan47@gmail.com> wrote:

> Oh, just want to follow up on the question on AsyncStreamTask. The main
> consideration factor on setting a big task.max.concurrency value is the
> memory required.
>
> And assuming your join function works like micro matches (i.e. buffer all
> input messages from all streams for a while, apply the join function, and
> discard all buffered messages), you can make it work w/o a local KV store.
> If any of your buffered message is not for the current join, but is needed
> for future join, you may have to use a local KV store to ensure correctness
> and no-dataloss across container restarts.
>
> -Yi
>
> On Thu, Oct 12, 2017 at 11:40 AM, Yi Pan <nickpan47@gmail.com> wrote:
>
> > Hi, Jef,
> >
> > What I suggest is exactly in-process, in-memory KV store. Samza has two
> > types of such built-in KVstores: in-memory and RocksDB. Both can be
> backed
> > by a changelog topic in Kafka as the failure recovery mechanism (i.e. if
> a
> > container fails, it can reseed the whole store from the changelog topic
> in
> > Kafka). Since they are built-in KV stores in Samza, there is no
> additional
> > external systems introduced in the solution. Here is the doc link for
> state
> > management in Samza: http://samza.apache.org/learn/documentation/0.13/
> > container/state-management.html. Please read the section of "Local
> State".
> >
> > The Samza high-level API is not based on KStream API. It is supporting
> > bi-way join now, but as long as the join key are in the same partition
> the
> > result can be in memory and does not need to go through another
> persistent
> > topics.
> >
> > Best!
> >
> > -Yi
> >
> > On Thu, Oct 12, 2017 at 10:00 AM, Jef G <jefg@dataminr.com> wrote:
> >
> >> Yi, thanks for your detailed reply!
> >>
> >> I believe what you are suggesting regarding a KV store is to set up a
> >> remote durable system to maintain the join state. That way if a node
> dies
> >> and Samza restarts the task on another node, the join state is still
> >> available. Is that correct?
> >>
> >> This approach is certainly an option. However, we were hoping to use an
> >> in-process in-memory KV store, as a remote store would introduce a lot
> of
> >> latency for us. In some cases we would have to make more than 100,000
> >> round
> >> trips per second to the KV store for a single stream, and we would want
> to
> >> be able to scale beyond that. It also introduces some complexity and
> >> another point of failure.
> >>
> >> Regarding using AsyncStreamTask with a very large (100,000)
> >> task.max.concurrency, is that a bad idea?
> >>
> >> The high-level API is based on the KStream API, right? Our jobs will
> >> sometimes need to join as many as 20 input streams. I believe currently
> >> Samza (and KStream) only supports a binary join and if that is the case,
> >> we
> >> would need 19 binary joins. The KStream doc suggests that all
> intermediate
> >> results are persisted, so many chained joins might be very inefficient.
> If
> >> so we would prefer to use the "classic" API.
> >>
> >> -Jef
> >>
> >> On Wed, Oct 11, 2017 at 7:37 PM, Yi Pan <nickpan47@gmail.com> wrote:
> >>
> >> > Hi, Jef,
> >> >
> >> > I would recommend that you use a KV store to buffer the messages for
> >> join.
> >> > The logic would be more predictable and state is also durable. In
> >> > StreamTask.process(), you can do some pseudo code like below:
> >> > {code}
> >> > public void process(IncomingMessageEnvelope msg, MessageCollector
> >> > collector, TaskCoordinator coordinator) {
> >> >    if (msg is from streamA) {
> >> >       storeA.put(msg.key, msg);
> >> >    } else {
> >> >       storeB.put(msg.key, msg);
> >> >    }
> >> >    if (joinCondition is triggered) {
> >> >       doJoin(storeA, storeB);
> >> >    }
> >> > }
> >> > {code}
> >> >
> >> > Make sure that you configure storeA and storeB w/ changelog s.t. they
> >> can
> >> > be recovered. Then, you don't need to worry about the data loss, since
> >> > before the auto-checkpoint, your buffered messages are flushed to disk
> >> and
> >> > changelog via storeA and storeB. If you do not want to delete each and
> >> > every buffered message after join, you can set TTL for each store if
> you
> >> > are using RocksDB store.
> >> >
> >> > We are also actively working on build-in join operator in Samza
> >> high-level
> >> > APIs. The new high-level APIs are already released in Samza 0.13.1
> with
> >> the
> >> > feature preview here:
> >> > http://samza.apache.org/startup/preview/#high-level-api. Feel free to
> >> take
> >> > a look and try it. We love to hear about feedbacks now. The current
> >> version
> >> > does not support durable state in join yet. We are actively working on
> >> > durable state support in he next release. Note that the high-level API
> >> is
> >> > still in early evolution and might change in the next two releases.
> >> >
> >> > Best!
> >> >
> >> > -Yi
> >> >
> >> > On Wed, Oct 11, 2017 at 1:56 PM, Jef G <jefg@dataminr.com> wrote:
> >> >
> >> > > Hello. My team is looking into Samza for doing real-time processing.
> >> We
> >> > > would like to run a directed graph of jobs, where the records in
> each
> >> > job's
> >> > > input streams are joined on a common key. We have functionality to
> >> > perform
> >> > > the join by buffering records from the input streams until certain
> >> > > conditions are met and then passing them on.
> >> > >
> >> > > We are wondering about the best way to integrate this functionality
> >> into
> >> > a
> >> > > Samza job. After looking over the API we see two possibilities:
> >> > >
> >> > > 1. Use a StreamTask that adds records to a buffer. This is the
> method
> >> > that
> >> > > the "ad event" example uses. But we am concerned that the framework
> >> > commits
> >> > > a StreamTask's offset after process() completes, so if the job
> fails,
> >> > > records in the buffer are permanently lost.
> >> > >
> >> > > 2. Use an AsyncTask that adds records to a buffer. Also add
> >> TaskCallbacks
> >> > > to the buffer. When records are eventually joined and processed,
> >> commit
> >> > > their callbacks. This method seems promising but it requires setting
> >> > > task.max.concurrency very high - possibly in the tens of thousands
> in
> >> our
> >> > > case. Are we likely to run into any issues doing that?
> >> > >
> >> > > Are there any other options that we overlooked? What is the best
> >> > approach?
> >> > >
> >> > > -Jef G
> >> > >
> >> >
> >>
> >>
> >>
> >> --
> >> Jef G
> >> Senior Data Scientist | Dataminr | dataminr.com
> >> 99 Madison Ave, 3rd Floor | New York, NY 10016
> >> jefg@dataminr.com
> >>
> >
> >
>



-- 
Jef G
Senior Data Scientist | Dataminr | dataminr.com
99 Madison Ave, 3rd Floor | New York, NY 10016
jefg@dataminr.com

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message