Return-Path: X-Original-To: apmail-samza-dev-archive@minotaur.apache.org Delivered-To: apmail-samza-dev-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CD189175B0 for ; Thu, 2 Apr 2015 22:09:03 +0000 (UTC) Received: (qmail 22769 invoked by uid 500); 2 Apr 2015 22:09:03 -0000 Delivered-To: apmail-samza-dev-archive@samza.apache.org Received: (qmail 22712 invoked by uid 500); 2 Apr 2015 22:09:03 -0000 Mailing-List: contact dev-help@samza.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@samza.apache.org Delivered-To: mailing list dev@samza.apache.org Received: (qmail 22690 invoked by uid 99); 2 Apr 2015 22:09:03 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 Apr 2015 22:09:03 +0000 X-ASF-Spam-Status: No, hits=-0.0 required=5.0 tests=RCVD_IN_DNSWL_NONE,SPF_HELO_PASS,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of fvillegas@linkedin.com designates 108.174.6.121 as permitted sender) Received: from [108.174.6.121] (HELO mail521.linkedin.com) (108.174.6.121) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 Apr 2015 22:08:38 +0000 Authentication-Results: mail521.prod x-tls.subject="/C=US/ST=California/L=Mountain View/O=LinkedIn Corporation/CN=*.corp.linkedin.com"; auth=pass (cipher=RC4-SHA) Received: from [172.18.129.51] ([172.18.129.51:29434] helo=eat1-mav02.corp.linkedin.com) by mail521.prod (envelope-from ) (ecelerity 3.6.7.46655 r(Core:3.6.7.0)) with ESMTPS (cipher=RC4-SHA subject="/C=US/ST=California/L=Mountain View/O=LinkedIn Corporation/CN=*.corp.linkedin.com") id 5C/3B-15023-8ADBD155; Thu, 02 Apr 2015 22:07:36 +0000 X-IronPort-AV: E=Sophos;i="5.11,513,1422950400"; d="scan'208";a="3420137" Received: from LVA1-MB02.linkedin.biz ([fe80::6c9f:ed72:534a:76d7]) by LVA1-HUB02.linkedin.biz ([::1]) with mapi id 14.03.0195.001; Thu, 2 Apr 2015 15:07:23 -0700 From: Felix GV To: "dev@samza.apache.org" Subject: RE: How do you serve the data computed by Samza? Thread-Topic: How do you serve the data computed by Samza? Thread-Index: AdBoq5Wes7l+zPv8Q2CR3swbjy8dMgDXdv6A//+QTkmAATIaAIAAaNQhgAIgjYCAADXfAP//jegD Date: Thu, 2 Apr 2015 22:07:22 +0000 Message-ID: References: , In-Reply-To: Accept-Language: en-US Content-Language: en-US X-MS-Has-Attach: X-MS-TNEF-Correlator: x-originating-ip: [172.29.36.247] Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: quoted-printable MIME-Version: 1.0 X-Virus-Checked: Checked by ClamAV on apache.org That's a good point Chinmay. Doing smart throttling is more complex and not= as reliable in the push model.=0A= =0A= Your point that storage partitioning and Kafka partitioning need to line up= is fair. It is indeed an assumption I am making in the proposed design.=0A= =0A= --=0A= =0A= Felix GV=0A= Data Infrastructure Engineer=0A= Distributed Data Systems=0A= LinkedIn=0A= =0A= fgv@linkedin.com=0A= linkedin.com/in/felixgv=0A= =0A= ________________________________________=0A= From: Chinmay Soman [chinmay.cerebro@gmail.com]=0A= Sent: Thursday, April 02, 2015 2:54 PM=0A= To: dev@samza.apache.org=0A= Subject: Re: How do you serve the data computed by Samza?=0A= =0A= My 2 cents =3D> One thing to note about the push model : multi-tenancy=0A= =0A= When your storage system (Druid for example) is used in a multi-tenant=0A= fashion - then push model is a bit difficult to operate. Primarily because= =0A= there is no real feedback loop from the storage system. Yes - if the=0A= storage system starts doing bad - then you get timeouts and higher=0A= latencies - but then you're already in a position where you're probably=0A= breaking SLAs (for some tenant).=0A= =0A= In that sense, a pull model might be better since the consumer can=0A= potentially have more visibility into how this particular node is doing.=0A= Also, with the Kafka consumer batches things up - so theoretically - you=0A= could get similar throughput. Downside of this approach is of course - the= =0A= storage system partitioning scheme *has to* line up with the Kafka=0A= partitioning scheme.=0A= =0A= On Thu, Apr 2, 2015 at 11:41 AM, Roger Hoover =0A= wrote:=0A= =0A= > Felix,=0A= >=0A= > I see your point about simple Kafka consumers. My thought was that if=0A= > you're already managing a Samza/YARN deployment then these types of jobs= =0A= > would be "just another job" and not require an additional process=0A= > management/monitoring/operations setup. If you've already got a way to= =0A= > handle vanilla Kafka jobs then it makes sense.=0A= >=0A= > For the push model, the way we're planning to deal with the latency of=0A= > round-trip calls is to batch up pushs to the downstream system. Both Dru= id=0A= > Tranquility and the ES transport node protocol allow you to batch index= =0A= > requests. I'm curious if pull would be that much more efficient.=0A= >=0A= > Cheers,=0A= >=0A= > Roger=0A= >=0A= > On Wed, Apr 1, 2015 at 10:26 AM, Felix GV =0A= > wrote:=0A= >=0A= > > Hi Roger,=0A= > >=0A= > > You bring up good points, and I think the short answer is that there ar= e=0A= > > trade-offs to everything, of course (:=0A= > >=0A= > > What I described could definitely be implemented as a Samza job, and I= =0A= > > think that would make a lot of sense if the data serving system was als= o=0A= > > deployed via YARN. This way, the Samza tasks responsible for ingesting= =0A= > and=0A= > > populating the data serving system's nodes could be spawned wherever YA= RN=0A= > > knows these nodes are located. For data serving systems not well=0A= > integrated=0A= > > with YARN however, I'm not sure that there would be that much win in=0A= > using=0A= > > the Samza deployment model. And since the consumers themselves are pret= ty=0A= > > simple (no joining of streams, no local state, etc.), this seems to be = a=0A= > > case where Samza is a bit overkill and a regular Kafka consumer is=0A= > > perfectly fine (except for the YARN-enabled auto-deployment aspect, lik= e=0A= > I=0A= > > mentioned).=0A= > >=0A= > > As for push versus pull, I think the trade-off is the following: push i= s=0A= > > mostly simpler and more decoupled, as you said, but I think pull would = be=0A= > > more efficient. The reason for that is that Kafka consumption is very= =0A= > > efficient (thanks to batching and compression), but most data serving= =0A= > > systems don't provide a streaming ingest API for pushing data efficient= ly=0A= > > to them, instead they have single record put/insert APIs which require = a=0A= > > round-trip to be acknowledged. This is perfectly fine in low-throughput= =0A= > > scenarios, but does not support very high throughput of ingestion like= =0A= > > Kafka can provide. By co-locating the pulling process (i.e.: Kafka=0A= > > consumer) with the data serving node, it makes it a bit more affordable= =0A= > to=0A= > > do single puts since the (local) round-trip acks would be=0A= > > near-instantaneous. Pulling also makes the tracking of offsets across= =0A= > > different nodes a bit easier, since each node can consume at its own=0A= > pace,=0A= > > and resume at whatever point in the past it needs (i.e.: rewind) withou= t=0A= > > affecting the other replicas. Tracking offsets across many replicas in= =0A= > the=0A= > > push model is a bit more annoying, though still doable, of course.=0A= > >=0A= > > --=0A= > >=0A= > > Felix GV=0A= > > Data Infrastructure Engineer=0A= > > Distributed Data Systems=0A= > > LinkedIn=0A= > >=0A= > > fgv@linkedin.com=0A= > > linkedin.com/in/felixgv=0A= > >=0A= > > ________________________________________=0A= > > From: Roger Hoover [roger.hoover@gmail.com]=0A= > > Sent: Tuesday, March 31, 2015 8:57 PM=0A= > > To: dev@samza.apache.org=0A= > > Subject: Re: How do you serve the data computed by Samza?=0A= > >=0A= > > Ah, thanks for the great explanation. Any particular reason that the= =0A= > > job(s) you described should not be Samza jobs?=0A= > >=0A= > > We're started experimenting with such jobs for Druid and Elasticsearch.= =0A= > > For Elasticsearch, the Samza job containers join the Elasticsearch=0A= > cluster=0A= > > as transport nodes and use the Java API to push ES data nodes. Likewis= e=0A= > > for Druid, the Samza job uses the Tranquility API to schedule jobs (=0A= > >=0A= > >=0A= > https://github.com/metamx/tranquility/tree/master/src/main/scala/com/meta= mx/tranquility/samza=0A= > > ).=0A= > >=0A= > > The nice part about push versus pull is that the downstream system does= =0A= > not=0A= > > need plugins (like ES rivers) that may complicate it's configuration or= =0A= > > destabilize the system.=0A= > >=0A= > > Cheers,=0A= > >=0A= > > Roger=0A= > >=0A= > > On Tue, Mar 31, 2015 at 10:56 AM, Felix GV=0A= > > >=0A= > > wrote:=0A= > >=0A= > > > Thanks for your reply Roger! Very insightful (:=0A= > > >=0A= > > > > 6. If there was a highly-optimized and reliable way of ingesting=0A= > > > > partitioned streams quickly into your online serving system, would= =0A= > that=0A= > > > > help you leverage Samza more effectively?=0A= > > >=0A= > > > >> 6. Can you elaborate please?=0A= > > >=0A= > > > Sure. The feature set I have in mind is the following:=0A= > > >=0A= > > > * Provide a thinly-wrapped Kafka producer which does appropriate= =0A= > > > partitioning and includes useful metadata (such as production=0A= > timestamp,=0A= > > > etc.) alongside the payload. This producer would be used in the last= =0A= > step=0A= > > > of processing of a Samza topology, in order to emit to Kafka some=0A= > > > processed/joined/enriched data which is destined for online serving.= =0A= > > > * Provide a consumer process which can be co-located on the same= =0A= > > hosts=0A= > > > as your data serving system. This process consumes from the appropria= te=0A= > > > partitions and checkpoints its offsets on its own. It leverages Kafka= =0A= > > > batching and compression to make consumption very efficient.=0A= > > > * For each records the consumer process issues a put/insert local= ly=0A= > > to=0A= > > > the co-located serving process. Since this is a local operation, it i= s=0A= > > also=0A= > > > very cheap and efficient.=0A= > > > * The consumer process can also optionally throttle its insertion= =0A= > > rate=0A= > > > by monitoring some performance metrics of the co-located data serving= =0A= > > > process. For example, if the data serving process exposes a p99 laten= cy=0A= > > via=0A= > > > JMX or other means, this can be used in a tight feedback loop to back= =0A= > off=0A= > > > if read latency degrades beyond a certain threshold.=0A= > > > * This ingestion platform should be easy to integrate with any=0A= > > > consistently-routed data serving system, by implementing some simple= =0A= > > > interfaces to let the ingestion system understand the key-to-partitio= n=0A= > > > assignment strategy, as well as the partition-to-node assignment=0A= > > strategy.=0A= > > > Optionally, a hook to access performance metrics could also be=0A= > > implemented=0A= > > > if throttling is deemed important (as described in the previous point= ).=0A= > > > * Since the consumer process lives in a separate process, the=0A= > system=0A= > > > benefits from good isolation guarantees. The consumer process can be= =0A= > > capped=0A= > > > to a low amount of heap, and its GC is inconsequential for the servin= g=0A= > > > platform. It's also possible to bounce the consumer and data serving= =0A= > > > processes independently of each other, if need be.=0A= > > >=0A= > > > There are some more nuances and additional features which could be ni= ce=0A= > > to=0A= > > > have, but that's the general idea.=0A= > > >=0A= > > >=0A= > > > It seems to me like such system would be valuable, but I'm wondering= =0A= > what=0A= > > > other people in the open-source community think, hence why I was=0A= > > interested=0A= > > > in starting this thread...=0A= > > >=0A= > > >=0A= > > > Thanks for your feedback!=0A= > > >=0A= > > > -F=0A= > > >=0A= > >=0A= >=0A= =0A= =0A= =0A= --=0A= Thanks and regards=0A= =0A= Chinmay Soman=0A=