samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jef G <j...@dataminr.com>
Subject Re: buffering records for join
Date Thu, 12 Oct 2017 17:00:29 GMT
Yi, thanks for your detailed reply!

I believe what you are suggesting regarding a KV store is to set up a
remote durable system to maintain the join state. That way if a node dies
and Samza restarts the task on another node, the join state is still
available. Is that correct?

This approach is certainly an option. However, we were hoping to use an
in-process in-memory KV store, as a remote store would introduce a lot of
latency for us. In some cases we would have to make more than 100,000 round
trips per second to the KV store for a single stream, and we would want to
be able to scale beyond that. It also introduces some complexity and
another point of failure.

Regarding using AsyncStreamTask with a very large (100,000)
task.max.concurrency, is that a bad idea?

The high-level API is based on the KStream API, right? Our jobs will
sometimes need to join as many as 20 input streams. I believe currently
Samza (and KStream) only supports a binary join and if that is the case, we
would need 19 binary joins. The KStream doc suggests that all intermediate
results are persisted, so many chained joins might be very inefficient. If
so we would prefer to use the "classic" API.

-Jef

On Wed, Oct 11, 2017 at 7:37 PM, Yi Pan <nickpan47@gmail.com> wrote:

> Hi, Jef,
>
> I would recommend that you use a KV store to buffer the messages for join.
> The logic would be more predictable and state is also durable. In
> StreamTask.process(), you can do some pseudo code like below:
> {code}
> public void process(IncomingMessageEnvelope msg, MessageCollector
> collector, TaskCoordinator coordinator) {
>    if (msg is from streamA) {
>       storeA.put(msg.key, msg);
>    } else {
>       storeB.put(msg.key, msg);
>    }
>    if (joinCondition is triggered) {
>       doJoin(storeA, storeB);
>    }
> }
> {code}
>
> Make sure that you configure storeA and storeB w/ changelog s.t. they can
> be recovered. Then, you don't need to worry about the data loss, since
> before the auto-checkpoint, your buffered messages are flushed to disk and
> changelog via storeA and storeB. If you do not want to delete each and
> every buffered message after join, you can set TTL for each store if you
> are using RocksDB store.
>
> We are also actively working on build-in join operator in Samza high-level
> APIs. The new high-level APIs are already released in Samza 0.13.1 with the
> feature preview here:
> http://samza.apache.org/startup/preview/#high-level-api. Feel free to take
> a look and try it. We love to hear about feedbacks now. The current version
> does not support durable state in join yet. We are actively working on
> durable state support in he next release. Note that the high-level API is
> still in early evolution and might change in the next two releases.
>
> Best!
>
> -Yi
>
> On Wed, Oct 11, 2017 at 1:56 PM, Jef G <jefg@dataminr.com> wrote:
>
> > Hello. My team is looking into Samza for doing real-time processing. We
> > would like to run a directed graph of jobs, where the records in each
> job's
> > input streams are joined on a common key. We have functionality to
> perform
> > the join by buffering records from the input streams until certain
> > conditions are met and then passing them on.
> >
> > We are wondering about the best way to integrate this functionality into
> a
> > Samza job. After looking over the API we see two possibilities:
> >
> > 1. Use a StreamTask that adds records to a buffer. This is the method
> that
> > the "ad event" example uses. But we am concerned that the framework
> commits
> > a StreamTask's offset after process() completes, so if the job fails,
> > records in the buffer are permanently lost.
> >
> > 2. Use an AsyncTask that adds records to a buffer. Also add TaskCallbacks
> > to the buffer. When records are eventually joined and processed, commit
> > their callbacks. This method seems promising but it requires setting
> > task.max.concurrency very high - possibly in the tens of thousands in our
> > case. Are we likely to run into any issues doing that?
> >
> > Are there any other options that we overlooked? What is the best
> approach?
> >
> > -Jef G
> >
>



-- 
Jef G
Senior Data Scientist | Dataminr | dataminr.com
99 Madison Ave, 3rd Floor | New York, NY 10016
jefg@dataminr.com

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message