Return-Path: X-Original-To: apmail-spark-dev-archive@minotaur.apache.org Delivered-To: apmail-spark-dev-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B69E9189EC for ; Wed, 14 Oct 2015 10:16:21 +0000 (UTC) Received: (qmail 37279 invoked by uid 500); 14 Oct 2015 10:16:20 -0000 Delivered-To: apmail-spark-dev-archive@spark.apache.org Received: (qmail 37167 invoked by uid 500); 14 Oct 2015 10:16:20 -0000 Mailing-List: contact dev-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list dev@spark.apache.org Received: (qmail 37153 invoked by uid 99); 14 Oct 2015 10:16:20 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 14 Oct 2015 10:16:20 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 9A1A41A2292 for ; Wed, 14 Oct 2015 10:16:19 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.88 X-Spam-Level: ** X-Spam-Status: No, score=2.88 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=3, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id zKQwmlG9-xoN for ; Wed, 14 Oct 2015 10:16:09 +0000 (UTC) Received: from mail-ob0-f181.google.com (mail-ob0-f181.google.com [209.85.214.181]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with ESMTPS id 9431520758 for ; Wed, 14 Oct 2015 10:16:09 +0000 (UTC) Received: by obbzf10 with SMTP id zf10so35530674obb.2 for ; Wed, 14 Oct 2015 03:16:09 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:date:message-id:subject:from:to:content-type; bh=OJL5PhbCcPns3FRo4CRfvNpUUd8U1zGb+GdN0th+rd4=; b=A3QTpKKCHhFizxRztw5xqU7jp32Pnw4bLxNlA+WspeObevVf5Owt/r0Smgk8G6DUWq jZJMKbU4veaBmPcLlXpt2VHIYGf8vpY2/uyoWgApmT+NdnUOFoFG+BLS7tY8jSnm4BlF DKCA5xFwERPLTBUMUrlLsoytGL4at7qwi9MFWJNZr9Huo7LaaRBz+szifCPRwYwtPpte uZrZboWJt/oQpiR/nm3XaibmNzIHoSacqvd5Rk95ZmlWUNo2EWB2DFPWdCz7FJVRUESS U3Y03MCEY5VnSSfeu50eMgL7bta6UouXfdWSIpitpCKgOjF57sWfX5rVvgSNIvcibkut EQzg== MIME-Version: 1.0 X-Received: by 10.182.68.113 with SMTP id v17mr1329151obt.72.1444817768934; Wed, 14 Oct 2015 03:16:08 -0700 (PDT) Received: by 10.76.144.33 with HTTP; Wed, 14 Oct 2015 03:16:08 -0700 (PDT) Date: Wed, 14 Oct 2015 15:46:08 +0530 Message-ID: Subject: Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project From: Dibyendu Bhattacharya To: "dev@spark.apache.org" Content-Type: multipart/alternative; boundary=e89a8f83a90de1f45905220dd98c --e89a8f83a90de1f45905220dd98c Content-Type: text/plain; charset=UTF-8 Hi, I have raised a JIRA ( https://issues.apache.org/jira/browse/SPARK-11045) to track the discussion but also mailing dev group for your opinion. There are some discussions already happened in Jira and love to hear what others think. You can directly comment against the Jira if you wish. This kafka consumer is around for a while in spark-packages ( http://spark-packages.org/package/dibbhatt/kafka-spark-consumer ) and I see many people started using it , I am now thinking of contributing back to Apache Spark core project so that it can get better support ,visibility and adoption. Few Point about this consumer *Why this is needed , and how I position this Consumer : * This Consumer is NOT the replacement for existing DirectStream API. DirectStream solves the problem around "Exactly Once" semantics and "Global Ordering" of messages . But to achieve this DirectStream comes with an overhead. The overhead of maintaining the offset externally , limited parallelism while processing the RDD ( as the RDD partition is same as Kafka Partition ), and higher latency while processing RDD ( as messages are fetched when RDD is processed) . There are many who does not want "Exact Once" and "Global Ordering" of messages, or ordering are managed in external store ( say HBase), and want more parallelism and lower latency in their Streaming channel . At this point Spark does not have a better fallback option available in terms of Receiver Based API. Present Receiver Based API use Kafka High Level API which is low performance and has serious issue. [For this reason Kafka is coming up with new High Level Consumer API in 0.9] The Consumer which I implemented is using the Kafka Low Level API which gives more performance. This consumer has built in fault tolerant features for all failures recovery. This Consumer extended the code from Storm Kafka Spout which is being around for some time and has matured over the years and has all built in Kafka fault tolerant capabilities. This same Kafka consumer for spark is being running in various production scenarios presently and already being adopted by many in the spark community. *Why Can't we fix existing Receiver based API in Spark* : This is not possible unless you move to Kafka Low Level API . Or let wait for Kafka 0.9 where they are re-writing the HighLevel Consumer API and built another consumer for Kafka 0.9 customers . This approach seems to be not good in my opinion. The Kafka Low Level API which I used in my consumer ( and even DirectStream uses ) will not going to be deprecated in near future. So if Kafka Consumer for Spark is using Low Level API for Receiver based mode, that will make sure all Kafka Customers who are presently in 0.8.x or who will use 0.9 , benefited form this same API. This will give easier maintenance to manage single API for any Kafka versions. Also this will make sure both Direct Stream and Receiver mode utilize same Kafka API. *Concerns around Low Level API Complexity* Yes, implementing a reliable consumer using Kafka Low Level consumer API is complex. But same has been done for Strom -Kafka Spout and has been stable for quite some time. This consumer for Spark is battle tested in various production loads and gives much better performance than existing Kafka Consumers for Spark and has better fault tolerant approach than existing Receiver based mode. I do not think having a complex code should be a major concern to deny a stable and high performance consumer for community. I am okay if anyone interested to benchmark against other Kafka Consumers for Spark and do various fault testing to make sure what I am saying is correct. *Why can't this consumer continue to be in Spark-Package ?* This can be possible. But what I see , many customer who want to fallback to receiver based mode as they may not need "Exact Once" semantics or "Global Ordering" , seems to little tentative using a spark-package library for their critical streaming pipeline. And they are forced to use faulty and buggy Kafka High Level API based mode. This consumer being part of Spark project will give much higher adoption and support from community. *Some Major features around this consumer :* This consumer is controlling the rate limit by maintaining the constant Block size where as default rate limiting in other Spark consumers are done by number of messages. This is an issue when Kafka has messages of different sizes and there is no deterministic way to know the actual block sizes and memory utilization if rate control done by number of messages. This consumer has in-built PID controller which controls the Rate of consumption again by modifying the block size and consume only that much amount of messages needed from Kafka . In default Spark consumer , it fetches chunk of messages and then apply throttle to control the rate. Which can lead to excess I/O while consuming from Kafka. There are other features in this Consumer which we can discuss at length once we are convinced that Kafka Low Level API is way to go. Regards, Dibyendu --e89a8f83a90de1f45905220dd98c Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi,=C2=A0

I have raised a JIRA (=C2=A0<= a href=3D"https://issues.apache.org/jira/browse/SPARK-11045">https://issues= .apache.org/jira/browse/SPARK-11045) to track the discussion but also m= ailing dev group for your opinion. There are some discussions already happe= ned in Jira and love to hear what others think. You can directly comment ag= ainst the Jira if you wish.

This kafka= consumer is around for a while in spark-packages (http://spark-packa= ges.org/package/dibbhatt/kafka-spark-consumer )=C2=A0and I see many people started using it , I am now thinking of contr= ibuting back to Apache Spark core project so that it can get better support= ,visibility and adoption.

Few Point about this consumer=C2=A0

Why this is needed , and how I po= sition this Consumer :=C2=A0

This Consumer is NOT the replacement for existing DirectStream API. = DirectStream solves the problem around "Exactly Once" semantics a= nd "Global Ordering" of messages . But to=C2=A0achieve=C2=A0this = DirectStream comes with an overhead. The overhead of=C2=A0maintaining=C2=A0= the offset externally ,=C2=A0limited=C2=A0parallelism=C2=A0while processing= the RDD ( as the RDD partition is same as Kafka Partition ), and higher la= tency while processing RDD ( as messages are fetched when RDD is processed)= . There are many who does not want "Exact Once" and "Global= Ordering" of messages, or ordering are managed in external store ( sa= y HBase), =C2=A0and want more=C2=A0parallelism=C2=A0and lower latency in th= eir Streaming channel . At this point Spark does not have a better fallback= option available in terms of Receiver Based API. Present Receiver Based AP= I use Kafka High Level API which is low performance and has serious issue. = [For this reason Kafka is coming up with new High Level Consumer API in 0.9= ]=C2=A0

The Consumer which= I implemented is using the Kafka Low Level API which gives more performanc= e.=C2=A0 This consumer has built in fault tolerant features for all failure= s recovery. This Consumer extended the code from Storm Kafka Spout which is= being around for some time and has matured over the years and has all buil= t in Kafka fault tolerant capabilities. This same Kafka consumer for spark = is being running in various production scenarios presently and already bein= g adopted by many in the spark community.

=
Why Can't we fix existing Receiver based API in Spar= k :

This is not possib= le unless you move to Kafka Low Level API . Or let wait for Kafka 0.9 where= they are re-writing the HighLevel Consumer API and built another consumer = for Kafka 0.9 customers .
This approach seems to be n= ot good in my opinion. The Kafka Low Level API which I used in my consumer = ( and even DirectStream uses ) will not going to be deprecated in near futu= re. So if Kafka Consumer for Spark is using Low Level API for Receiver base= d mode, that will make sure all Kafka Customers who are presently in 0.8.x = or who will use 0.9 , benefited=C2=A0form this same API. This will give eas= ier=C2=A0maintenance=C2=A0to manage single API for any Kafka versions. Also= this will make sure both Direct Stream and Receiver mode utilize same Kafk= a API.

Concerns around = Low Level API Complexity

=
Yes, implementing a reliable consumer using Kafka Low Level consumer API= is complex. But same has been done for Strom -Kafka Spout and has been sta= ble for=C2=A0quite=C2=A0some time. This consumer for Spark is battle tested= in various production loads and gives much better performance than existin= g Kafka Consumers for Spark and has better fault tolerant approach than exi= sting Receiver based mode. I do not think having a complex code should be a= major concern to deny a stable and high performance consumer for community= . I am okay if anyone interested to benchmark against other Kafka Consumers= for Spark and do various fault testing to make sure what I am saying is co= rrect.

Why can't th= is consumer continue to be in Spark-Package ?
<= font color=3D"#333333" face=3D"Helvetica Neue, Helvetica, Segoe UI, Arial, = freesans, sans-serif">
This can be possible. But what I see , many custome= r who want to fallback to receiver based mode as they may not need "Ex= act Once" semantics or "Global Ordering" , seems to little t= entative using a spark-package library for their critical streaming pipelin= e. And they are forced to use faulty and buggy Kafka High Level API based m= ode. This consumer being part of Spark project will give much higher adopti= on and support from community.

Some Major features around this consumer :
<= div>
This consumer is controlling the rate limit by main= taining the constant Block size where as default rate limiting in other Spa= rk consumers are done by number of messages. This is an issue when Kafka ha= s messages of different sizes and there is no deterministic way to know the= actual block sizes and memory utilization if rate control done by number o= f messages.

This consumer has in-built PID control= ler which controls the Rate of consumption again by modifying the block siz= e and consume only that much amount of messages needed from Kafka . In defa= ult Spark consumer , it fetches chunk of messages and then apply throttle t= o control the rate. Which can lead to excess I/O while consuming from Kafka= . =C2=A0

= There are other features = in this Consumer which we can discuss at length once we are convinced=C2=A0= that Kafka Low Level API is way to go.

Regards,=C2=A0
Dibyendu





--e89a8f83a90de1f45905220dd98c--