spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Shixiong(Ryan) Zhu" <shixi...@databricks.com>
Subject Re: Structured Streaming - Kafka
Date Tue, 07 Mar 2017 22:04:45 GMT
Good catch. Could you create a ticket? You can also submit a PR to fix it
if you have time :)

On Tue, Mar 7, 2017 at 1:52 PM, Bowden, Chris <chris.bowden@hpe.com> wrote:

> Potential bug when using startingOffsets = SpecificOffsets with Kafka
> topics containing uppercase characters?
>
> KafkaSourceProvider#L80/86:
>
> val startingOffsets =
>   caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) match
{
>     case Some("latest") => LatestOffsets
>     case Some("earliest") => EarliestOffsets
>     case Some(json) => SpecificOffsets(JsonUtils.partitionOffsets(json))
>     case None => LatestOffsets
>   }
>
> Topics in JSON get lowered so underlying assignments in the consumer are
> incorrect, and the assertion in KafkaSource#L326 triggers:
>
> private def fetchSpecificStartingOffsets(
>     partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
>   val result = withRetriesWithoutInterrupt {
>     // Poll to get the latest assigned partitions
>     consumer.poll(0)
>     val partitions = consumer.assignment()
>     consumer.pause(partitions)
>     assert(partitions.asScala == partitionOffsets.keySet,
>       "If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n"
+
>         "Use -1 for latest, -2 for earliest, if you don't care.\n" +
>         s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions.asScala}")
>
>

Mime
View raw message