spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cody Koeninger <c...@koeninger.org>
Subject Re: DynamicPartitionKafkaRDD - 1:n mapping between kafka and RDD partition
Date Tue, 15 Mar 2016 15:35:41 GMT
No, I don't agree that someone explicitly calling repartition or
shuffle is the same as a constructor that implicitly breaks
guarantees.

Realistically speaking, the changes you have made are also totally
incompatible with the way kafka's new consumer works. Pulling
different out-of-order chunks of the same topicpartition from
different consumer nodes is going to make prefetch optimizations
useless.



On Mon, Mar 14, 2016 at 6:16 PM, Renyi Xiong <renyixiong0@gmail.com> wrote:
> right.
>
> However, I think it's developer's choice to purposely drop the guarantee
> like when they use the existing DStream.repartition where original
> per-topicpartition in-order processing is also not observed any more.
>
> Do you agree?
>
> On Thu, Mar 10, 2016 at 12:12 PM, Cody Koeninger <cody@koeninger.org> wrote:
>>
>> The central problem with doing anything like this is that you break
>> one of the basic guarantees of kafka, which is in-order processing on
>> a per-topicpartition basis.
>>
>> As far as PRs go, because of the new consumer interface for kafka 0.9
>> and 0.10, there's a lot of potential change already underway.
>>
>> See
>>
>> https://issues.apache.org/jira/browse/SPARK-12177
>>
>> On Thu, Mar 10, 2016 at 1:59 PM, Renyi Xiong <renyixiong0@gmail.com>
>> wrote:
>> > Hi TD,
>> >
>> > Thanks a lot for offering to look at our PR (if we fire one) at the
>> > conference NYC.
>> >
>> > As we discussed briefly the issues of unbalanced and under-distributed
>> > kafka
>> > partitions when developing Spark streaming application in Mobius (C# for
>> > Spark), we're trying the option of repartitioning within
>> > DirectKafkaInputDStream instead of DStream.repartiton API which
>> > introduces
>> > extra network cost and doesn't really solve the root cause.
>> >
>> > However, instead of firing a JIRA with PR directly, we decided to create
>> > a
>> > customized Kafka RDD / DStream (to start with and contribute back later
>> > if
>> > success) - DynamicPartitionKafkaRDD and
>> > DynamicPartitionKafkaInputDStream
>> > using inheritance model and expose a new API
>> > KafkaUtils.CreateDirectStreamWithRepartition with one more parameter -
>> > numPartitions (hint number of RDD partitions to create)
>> >
>> > it'll be great that you can take look at the code and share your
>> > comments:
>> >
>> >
>> > https://github.com/Microsoft/Mobius/tree/master/scala/src/main/org/apache/spark/streaming/api/kafka
>> >
>> > the major relevant change is in DynamicPartitionKafkaRDD.getPartitions
>> > where
>> > an average size of RDD partition is calculated first (total size of the
>> > topic divided by numPartitions) and used to split partitions (final RDD
>> > partitions will be greater or equal to numPartitions)
>> >
>> > there's a concern that Kafka partition[i] no longer maps to task[i]
>> > which
>> > might break existing application. here's our thinking:
>> >
>> > a. OffsetRanges in original implementation may have multiple topics
>> > meaning
>> > 'partition i maps to tasks i' is generally a false statement
>> >
>> > b. Even if only one topic is involved, partition sequence in
>> > offsetRanges
>> > comes from Kafka topic meta data response which doesn't necessary
>> > guarantee
>> > the sequence, even if it does, application should not take that
>> > dependency
>> >
>> > c. Topic partition split happens only when configured
>> >
>> >
>> > there're some other more complicated changes related to fault tolerance
>> > which are irrelevant here (but you're more than welcome to comment on
>> > them
>> > too) and are introduced to unblock the scenarios we're experiencing on a
>> > daily basis.
>> >
>> > 1. temporally redirect kafka read to C# worker by passing metadata
>> > instead
>> > of actual kafka messages to it, in C# worker, a C# version of kafka
>> > client
>> > is used which enables much easier debugging
>> >
>> > 2. bypass metadata request exceptions on driver side and let next batch
>> > retry
>> >
>> > 3. bypass some read errors on worker side
>> >
>> >
>> > Note all above are at very early stage, your comments will be much
>> > valuable
>> > and  appreciated.
>> >
>> >
>> > Thanks a lot,
>> >
>> > Reny.
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Mime
View raw message