spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ben Teeuwen (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-13939) Kafka createDirectStream not parallelizing properly
Date Thu, 24 Mar 2016 07:13:25 GMT

    [ https://issues.apache.org/jira/browse/SPARK-13939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15209901#comment-15209901
] 

Ben Teeuwen commented on SPARK-13939:
-------------------------------------

Thanks a lot Cody for narrowing the search space. The problem was the format of the data going
into the kafka producer; tuples with an empty key and the actual event as the value.

At http://stackoverflow.com/questions/26553412/produce-kafka-message-to-selected-partition,
they mention "The partitioner class for partitioning messages amongst sub-topics. The default
partitioner is based on the hash of the key." Now I noticed, but never questioned, that incoming
events from Kafka have are tuples. The key is empty, the value contains the actual event.
So first thing I did in Spark was (see also pastes above);

{code}
  stream_it = (directKafkaStream
    .map(lambda (key, js_string): json.loads(js_string)) # raw event json becomes a dictionary;
empty weird kafka key is ignored.)
{code}

The hash for that empty key produced by the kafka-console-producer will be the same, so it
is sent to 1 partition. We changed the key to a random number and now it works fluently :).

> Kafka createDirectStream not parallelizing properly
> ---------------------------------------------------
>
>                 Key: SPARK-13939
>                 URL: https://issues.apache.org/jira/browse/SPARK-13939
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>            Reporter: Ben Teeuwen
>         Attachments: 0CEAF9A0-6637-44BB-95B2-2529992723A2.png, 215B28E2-638B-494C-8084-FD46E9984522.png,
4E119936-14E3-490E-A885-7D2E2CB2940F.png, 9F0FF528-85DF-475D-9507-8FBF93C46750.png, ECBE2DFF-6B35-48C5-B692-B9A80FC1E3F5.png,
screenshot-1.png
>
>
> I’m trying to get a streaming app running using pyspark (1.6.0), Kafka and the receiverless
direct approach ‘createDirectStream’. But it seemingly has problems with the degree of
parallelism in Spark. I’ve written the app both in Scala and Pyspark; both exhibit the same
behavior.
> Context:
> - stream with 10-30k events per 10 seconds batch size.
> - kafka topic has 10 partitions.
> - createDirectStream with kafkaparams only metadata.broker.list, containing 4 brokers.
> - 10 executors 2 cores each, 3gb ram + 3gb ram driver mem.
> - backpressure on
> - not using speculative execution
> - simple logic: parse json, create key-value tuple, flatmap, reduceByKey, pprint to screen.
It is supposed to be keeping track of states, but for now I'm unfortunately having issues
with a simple printing of the minimum and maximum epoch.
> At the start of the streaming (e.g. started just now at 19.07):
> First thing I do is repartition to spread the events evenly over all the executors. Looking
at the streaming tab > batch details > Input Metadata, I see it ingests only from 1
kafka partition:
> {code}
> Kafka direct stream [0]	
>     topic: test    partition: 1    offsets: 16630012 to 16639226
> {code}
> One executor is doing the repartitioning, and is taking more than the batch interval
time. So backpressure kicks in. The events ingested as trimmed down to a 100. That gets processed
in 2 seconds. Then slowly, more Kafka partitions are being used. E.g. 10 minutes later:
> {code}
> Kafka direct stream [0]	
>     topic: test    partition: 9    offsets: 16262300 to 16262400
>     topic: test    partition: 1    offsets: 16683171 to 16683271
> {code}
> When running for a day, the amount of kafka partitions it ingests from stabilizes around
3-6 partitions. But it never ingests the full stream, though it has more partitions to ingest
from in parallel and executors to utilize. E.g. half an hour later:
> {code}
> Kafka direct stream [0]	
>     topic: test    partition: 9    offsets: 16327090 to 16328090
>     topic: test    partition: 6    offsets: 17140538 to 17141538
>     topic: test    partition: 0    offsets: 22776394 to 22777394
>     topic: test    partition: 1    offsets: 16747961 to 16748961
>     topic: test    partition: 7    offsets: 15090120 to 15091120
> {code}
> So it loses of a lot of events, and it processes older events in later batches. E.g.
printing min/max timestamps shows very events going back almost to the start of the streaming
app. E.g.
> {code}
> #### Printing at 16-03-16 19:36:33
> ### min 16-03-16 19:09:12 (epoch = 1458151752)
> #### Printing at 16-03-16 19:36:34
> ### max 16-03-16 19:31:51 (epoch = 1458153111)
> #### Printing at 16-03-16 19:36:42
> ### min 16-03-16 19:09:12 (epoch = 1458151752)
> #### Printing at 16-03-16 19:36:43
> ### max 16-03-16 19:31:51 (epoch = 1458153111)
> {code}
> My take from the ‘Simplified Parallelism’ bullet in the docs (http://spark.apache.org/docs/latest/streaming-kafka-integration.html),
is not to worry about parallellism, as long as I provide sufficient resources. And 10 execs
with 2 cores receiving from a kafka stream with 10 partitions, containing 10-30k events per
10 seconds, seems plentiful.
> (this was discussed during Amsterdam Spark Meetup March 14 2016 with [~holdenk_amp],
and she advised to write it up in a ticket here).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message