samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Felix GV <fville...@linkedin.com.INVALID>
Subject RE: How do you serve the data computed by Samza?
Date Thu, 02 Apr 2015 23:08:08 GMT
Hi Roger,

The slow storage shard situation is indeed a concern.

The slow storage shard will back up your pusher process for all shards if the incoming Kafka
stream partitions don't line up. Alternatively, your pusher process will keep going with the
healthy shards but will then need to re-consume the whole input stream just to push to the
unhealthy shard after it recovered. LinkedIn already has (at least) one or two such (push)
systems and they definitely work, but they have some operational limitations, as discussed
here.

Now, you can make the pusher process' input topic partitions line up with the storage partitions,
and that would alleviate the slow shard problem, but if you go through the trouble of partitioning
your input stream this way, then you're not really far off from the pull model anyway. Furthermore,
even with lined up partitioning, the push approach will still have the following caveats compared
to pull:

  1.  An extra hop. The hop is probably not a big deal in terms of latency, but it does double
your bandwidth requirements (and more than double if the storage system does not support batch
operations which are as efficient as Kafka's).
  2.  Smart throttling. You can throttle with a push approach, but it is very crude. Likely
the throttling would be a fixed QPS per topic or something like that. If you want to take
into account the serving latency of the storage nodes, in order to back off when the storage
node is under higher load, then it seems easier to achieve that accurately if you're co-located
with the storage node, as you would be able to poll its performance metrics quicker and more
often.
  3.  Multi-tenancy. If you want many different topics getting fed into the storage system,
but with different priorities for each, then it may also be easier to prioritize the various
streams against one another in the pull model. If your push process consumes partitions that
line up with the storage, then technically you can achieve the same thing, but then your push
process deployment and config becomes a bit more complex. The pushers are not just any stateless
auto-balanced consumer processes anymore, they need to be tied 1:1 or 1:M with the storage
nodes. At this point, you already went about 80% of the way towards the pull model, so I would
argue it makes things simpler to just go all the way.

Of course, there's definitely many ways to skin this cat. And some of the concerns above shouldn't
be someone's highest priority if they're just getting started playing around with Samza. But
I think it would be nice to have an open-source system available which "does all the right
things", so that even newcomers to Samza can have an easy way to ingest data into their serving
system.

My 2 cents.

--

Felix GV
Data Infrastructure Engineer
Distributed Data Systems
LinkedIn

fgv@linkedin.com
linkedin.com/in/felixgv

________________________________________
From: Roger Hoover [roger.hoover@gmail.com]
Sent: Thursday, April 02, 2015 3:24 PM
To: dev@samza.apache.org
Subject: Re: How do you serve the data computed by Samza?

Is it because the Kafka partitioning might not be the same as the storage
partitioning? So that a slow storage shard will prevent unrelated shards
from getting their messages?

Ah, I think I see what you mean. If so, then the solution is to make the
Kafka partitioning match the storage partitioning. If that case, push or
pull is the same, yeah?

Thanks,

Roger

On Thu, Apr 2, 2015 at 3:21 PM, Roger Hoover <roger.hoover@gmail.com> wrote:

> Chinmay,
>
> Thanks for your input.
>
> I'm not understanding what the difference is. With the design that Felix
> laid out, the co-located Kafka consumer is still doing a push to the
> storage system, right?. It just happens to be on the same machine. How is
> this different from pushing batches from a non-local Samza job? How does
> the pull-based approach you're thinking of deal with feedback and SLAs?
>
> Thanks,
>
> Roger
>
>
>
> On Thu, Apr 2, 2015 at 2:54 PM, Chinmay Soman <chinmay.cerebro@gmail.com>
> wrote:
>
>> My 2 cents => One thing to note about the push model : multi-tenancy
>>
>> When your storage system (Druid for example) is used in a multi-tenant
>> fashion - then push model is a bit difficult to operate. Primarily because
>> there is no real feedback loop from the storage system. Yes - if the
>> storage system starts doing bad - then you get timeouts and higher
>> latencies - but then you're already in a position where you're probably
>> breaking SLAs (for some tenant).
>>
>> In that sense, a pull model might be better since the consumer can
>> potentially have more visibility into how this particular node is doing.
>> Also, with the Kafka consumer batches things up - so theoretically - you
>> could get similar throughput. Downside of this approach is of course - the
>> storage system partitioning scheme *has to* line up with the Kafka
>> partitioning scheme.
>>
>> On Thu, Apr 2, 2015 at 11:41 AM, Roger Hoover <roger.hoover@gmail.com>
>> wrote:
>>
>> > Felix,
>> >
>> > I see your point about simple Kafka consumers. My thought was that if
>> > you're already managing a Samza/YARN deployment then these types of jobs
>> > would be "just another job" and not require an additional process
>> > management/monitoring/operations setup. If you've already got a way to
>> > handle vanilla Kafka jobs then it makes sense.
>> >
>> > For the push model, the way we're planning to deal with the latency of
>> > round-trip calls is to batch up pushs to the downstream system. Both
>> Druid
>> > Tranquility and the ES transport node protocol allow you to batch index
>> > requests. I'm curious if pull would be that much more efficient.
>> >
>> > Cheers,
>> >
>> > Roger
>> >
>> > On Wed, Apr 1, 2015 at 10:26 AM, Felix GV
>> <fvillegas@linkedin.com.invalid>
>> > wrote:
>> >
>> > > Hi Roger,
>> > >
>> > > You bring up good points, and I think the short answer is that there
>> are
>> > > trade-offs to everything, of course (:
>> > >
>> > > What I described could definitely be implemented as a Samza job, and I
>> > > think that would make a lot of sense if the data serving system was
>> also
>> > > deployed via YARN. This way, the Samza tasks responsible for ingesting
>> > and
>> > > populating the data serving system's nodes could be spawned wherever
>> YARN
>> > > knows these nodes are located. For data serving systems not well
>> > integrated
>> > > with YARN however, I'm not sure that there would be that much win in
>> > using
>> > > the Samza deployment model. And since the consumers themselves are
>> pretty
>> > > simple (no joining of streams, no local state, etc.), this seems to
>> be a
>> > > case where Samza is a bit overkill and a regular Kafka consumer is
>> > > perfectly fine (except for the YARN-enabled auto-deployment aspect,
>> like
>> > I
>> > > mentioned).
>> > >
>> > > As for push versus pull, I think the trade-off is the following: push
>> is
>> > > mostly simpler and more decoupled, as you said, but I think pull
>> would be
>> > > more efficient. The reason for that is that Kafka consumption is very
>> > > efficient (thanks to batching and compression), but most data serving
>> > > systems don't provide a streaming ingest API for pushing data
>> efficiently
>> > > to them, instead they have single record put/insert APIs which
>> require a
>> > > round-trip to be acknowledged. This is perfectly fine in
>> low-throughput
>> > > scenarios, but does not support very high throughput of ingestion like
>> > > Kafka can provide. By co-locating the pulling process (i.e.: Kafka
>> > > consumer) with the data serving node, it makes it a bit more
>> affordable
>> > to
>> > > do single puts since the (local) round-trip acks would be
>> > > near-instantaneous. Pulling also makes the tracking of offsets across
>> > > different nodes a bit easier, since each node can consume at its own
>> > pace,
>> > > and resume at whatever point in the past it needs (i.e.: rewind)
>> without
>> > > affecting the other replicas. Tracking offsets across many replicas in
>> > the
>> > > push model is a bit more annoying, though still doable, of course.
>> > >
>> > > --
>> > >
>> > > Felix GV
>> > > Data Infrastructure Engineer
>> > > Distributed Data Systems
>> > > LinkedIn
>> > >
>> > > fgv@linkedin.com
>> > > linkedin.com/in/felixgv
>> > >
>> > > ________________________________________
>> > > From: Roger Hoover [roger.hoover@gmail.com]
>> > > Sent: Tuesday, March 31, 2015 8:57 PM
>> > > To: dev@samza.apache.org
>> > > Subject: Re: How do you serve the data computed by Samza?
>> > >
>> > > Ah, thanks for the great explanation. Any particular reason that the
>> > > job(s) you described should not be Samza jobs?
>> > >
>> > > We're started experimenting with such jobs for Druid and
>> Elasticsearch.
>> > > For Elasticsearch, the Samza job containers join the Elasticsearch
>> > cluster
>> > > as transport nodes and use the Java API to push ES data nodes.
>> Likewise
>> > > for Druid, the Samza job uses the Tranquility API to schedule jobs (
>> > >
>> > >
>> >
>> https://github.com/metamx/tranquility/tree/master/src/main/scala/com/metamx/tranquility/samza
>> > > ).
>> > >
>> > > The nice part about push versus pull is that the downstream system
>> does
>> > not
>> > > need plugins (like ES rivers) that may complicate it's configuration
>> or
>> > > destabilize the system.
>> > >
>> > > Cheers,
>> > >
>> > > Roger
>> > >
>> > > On Tue, Mar 31, 2015 at 10:56 AM, Felix GV
>> > <fvillegas@linkedin.com.invalid
>> > > >
>> > > wrote:
>> > >
>> > > > Thanks for your reply Roger! Very insightful (:
>> > > >
>> > > > > 6. If there was a highly-optimized and reliable way of ingesting
>> > > > > partitioned streams quickly into your online serving system,
would
>> > that
>> > > > > help you leverage Samza more effectively?
>> > > >
>> > > > >> 6. Can you elaborate please?
>> > > >
>> > > > Sure. The feature set I have in mind is the following:
>> > > >
>> > > > * Provide a thinly-wrapped Kafka producer which does appropriate
>> > > > partitioning and includes useful metadata (such as production
>> > timestamp,
>> > > > etc.) alongside the payload. This producer would be used in the last
>> > step
>> > > > of processing of a Samza topology, in order to emit to Kafka some
>> > > > processed/joined/enriched data which is destined for online serving.
>> > > > * Provide a consumer process which can be co-located on the same
>> > > hosts
>> > > > as your data serving system. This process consumes from the
>> appropriate
>> > > > partitions and checkpoints its offsets on its own. It leverages
>> Kafka
>> > > > batching and compression to make consumption very efficient.
>> > > > * For each records the consumer process issues a put/insert
>> locally
>> > > to
>> > > > the co-located serving process. Since this is a local operation, it
>> is
>> > > also
>> > > > very cheap and efficient.
>> > > > * The consumer process can also optionally throttle its
>> insertion
>> > > rate
>> > > > by monitoring some performance metrics of the co-located data
>> serving
>> > > > process. For example, if the data serving process exposes a p99
>> latency
>> > > via
>> > > > JMX or other means, this can be used in a tight feedback loop to
>> back
>> > off
>> > > > if read latency degrades beyond a certain threshold.
>> > > > * This ingestion platform should be easy to integrate with any
>> > > > consistently-routed data serving system, by implementing some simple
>> > > > interfaces to let the ingestion system understand the
>> key-to-partition
>> > > > assignment strategy, as well as the partition-to-node assignment
>> > > strategy.
>> > > > Optionally, a hook to access performance metrics could also be
>> > > implemented
>> > > > if throttling is deemed important (as described in the previous
>> point).
>> > > > * Since the consumer process lives in a separate process, the
>> > system
>> > > > benefits from good isolation guarantees. The consumer process can
be
>> > > capped
>> > > > to a low amount of heap, and its GC is inconsequential for the
>> serving
>> > > > platform. It's also possible to bounce the consumer and data serving
>> > > > processes independently of each other, if need be.
>> > > >
>> > > > There are some more nuances and additional features which could be
>> nice
>> > > to
>> > > > have, but that's the general idea.
>> > > >
>> > > >
>> > > > It seems to me like such system would be valuable, but I'm wondering
>> > what
>> > > > other people in the open-source community think, hence why I was
>> > > interested
>> > > > in starting this thread...
>> > > >
>> > > >
>> > > > Thanks for your feedback!
>> > > >
>> > > > -F
>> > > >
>> > >
>> >
>>
>>
>>
>> --
>> Thanks and regards
>>
>> Chinmay Soman
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message