spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tdas <...@git.apache.org>
Subject [GitHub] spark pull request #15102: [SPARK-17346][SQL] Add Kafka source for Structure...
Date Wed, 05 Oct 2016 09:27:53 GMT
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15102#discussion_r81930482
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
---
    @@ -0,0 +1,396 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +
    +import scala.collection.JavaConverters._
    +import scala.util.control.NonFatal
    +
    +import org.apache.kafka.clients.consumer.{Consumer, KafkaConsumer}
    +import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
    +import org.apache.kafka.common.TopicPartition
    +
    +import org.apache.spark.SparkContext
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.scheduler.ExecutorCacheTaskLocation
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.execution.streaming._
    +import org.apache.spark.sql.kafka010.KafkaSource._
    +import org.apache.spark.sql.types._
    +import org.apache.spark.util.UninterruptibleThread
    +
    +/**
    + * A [[Source]] that uses Kafka's own [[KafkaConsumer]] API to reads data from Kafka.
The design
    + * for this source is as follows.
    + *
    + * - The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that
contains
    + *   a map of TopicPartition -> offset. Note that this offset is 1 + (available offset).
For
    + *   example if the last record in a Kafka topic "t", partition 2 is offset 5, then
    + *   KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep
it consistent
    + *   with the semantics of `KafkaConsumer.position()`.
    + *
    + * - The [[ConsumerStrategy]] class defines which Kafka topics and partitions should
be read
    + *   by this source. These strategies directly correspond to the different consumption
options
    + *   in . This class is designed to return a configured [[KafkaConsumer]] that is used
by the
    + *   [[KafkaSource]] to query for the offsets. See the docs on
    + *   [[org.apache.spark.sql.kafka010.KafkaSource.ConsumerStrategy]] for more details.
    + *
    + * - The [[KafkaSource]] written to do the following.
    + *
    + *  - As soon as the source is created, the pre-configured KafkaConsumer returned by
the
    + *    [[ConsumerStrategy]] is used to query the initial offsets that this source should
    + *    start reading from. This used to create the first batch.
    + *
    + *   - `getOffset()` uses the KafkaConsumer to query the latest available offsets, which
are
    + *     returned as a [[KafkaSourceOffset]].
    + *
    + *   - `getBatch()` returns a DF that reads from the 'start offset' until the 'end offset'
in
    + *     for each partition. The end offset is excluded to be consistent with the semantics
of
    + *     [[KafkaSourceOffset]] and `KafkaConsumer.position()`.
    + *
    + *   - The DF returned is based on [[KafkaSourceRDD]] which is constructed such that
the
    + *     data from Kafka topic + partition is consistently read by the same executors across
    + *     batches, and cached KafkaConsumers in the executors can be reused efficiently.
See the
    + *     docs on [[KafkaSourceRDD]] for more details.
    + *
    + * Zero data lost is not guaranteed when topics are deleted. If zero data lost is critical,
the user
    + * must make sure all messages in a topic have been processed when deleting a topic.
    + *
    + * There is a known issue caused by KAFKA-1894: the query using KafkaSource maybe cannot
be stopped.
    + * To avoid this issue, you should make sure stopping the query before stopping the Kafka
brokers
    + * and not use wrong broker addresses.
    + */
    +private[kafka010] case class KafkaSource(
    +    sqlContext: SQLContext,
    +    consumerStrategy: ConsumerStrategy,
    +    executorKafkaParams: ju.Map[String, Object],
    +    sourceOptions: Map[String, String],
    +    metadataPath: String,
    +    failOnDataLoss: Boolean)
    +  extends Source with Logging {
    +
    +  private val sc = sqlContext.sparkContext
    +
    +  private val pollTimeoutMs = sourceOptions.getOrElse("kafkaConsumer.pollTimeoutMs",
"512").toLong
    +
    +  private val maxOffsetFetchAttempts =
    +    sourceOptions.getOrElse("fetchOffset.numRetries", "3").toInt
    +
    +  private val offsetFetchAttemptIntervalMs =
    +    sourceOptions.getOrElse("fetchOffset.retryIntervalMs", "10").toLong
    +
    +  /**
    +   * A KafkaConsumer used in the driver to query the latest Kafka offsets. This only
queries the
    +   * offsets and never commits them.
    +   */
    +  private val consumer = consumerStrategy.createConsumer()
    +
    +  /**
    +   * Lazy set initialPartitionOffsets to make sure only call `KafkaConsumer.poll` in
    +   * StreamExecutionThread. Otherwise, interrupting a thread running `KafkaConsumer.poll`
may hang
    +   * forever (KAFKA-1894).
    +   */
    --- End diff --
    
    nit:
    - Lazily initialize initialPartitionOffsets to make sure that `KafkaConsumer.poll` is
only called in StreamExecutionThread.
    - interrupting a thread *while* running


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message