spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gaborgsomogyi <...@git.apache.org>
Subject [GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...
Date Wed, 11 Apr 2018 16:05:25 GMT
Github user gaborgsomogyi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20997#discussion_r180810282
  
    --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
---
    @@ -0,0 +1,381 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka010
    +
    +import java.{util => ju}
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer}
    +import org.apache.kafka.common.{KafkaException, TopicPartition}
    +
    +import org.apache.spark.TaskContext
    +import org.apache.spark.internal.Logging
    +
    +private[kafka010] sealed trait KafkaDataConsumer[K, V] {
    +  /**
    +   * Get the record for the given offset if available.
    +   *
    +   * @param offset         the offset to fetch.
    +   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
    +   */
    +  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
    +    internalConsumer.get(offset, pollTimeoutMs)
    +  }
    +
    +  /**
    +   * Start a batch on a compacted topic
    +   *
    +   * @param offset         the offset to fetch.
    +   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
    +   */
    +  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
    +    internalConsumer.compactedStart(offset, pollTimeoutMs)
    +  }
    +
    +  /**
    +   * Get the next record in the batch from a compacted topic.
    +   * Assumes compactedStart has been called first, and ignores gaps.
    +   *
    +   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
    +   */
    +  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
    +    internalConsumer.compactedNext(pollTimeoutMs)
    +  }
    +
    +  /**
    +   * Rewind to previous record in the batch from a compacted topic.
    +   *
    +   * @throws NoSuchElementException if no previous element
    +   */
    +  def compactedPrevious(): ConsumerRecord[K, V] = {
    +    internalConsumer.compactedPrevious()
    +  }
    +
    +  /**
    +   * Release this consumer from being further used. Depending on its implementation,
    +   * this consumer will be either finalized, or reset for reuse later.
    +   */
    +  def release(): Unit
    +
    +  /** Reference to the internal implementation that this wrapper delegates to */
    +  protected def internalConsumer: InternalKafkaConsumer[K, V]
    +}
    +
    +
    +/**
    + * A wrapper around Kafka's KafkaConsumer.
    + * This is not for direct use outside this file.
    + */
    +private[kafka010]
    +class InternalKafkaConsumer[K, V](
    +  val groupId: String,
    +  val topicPartition: TopicPartition,
    +  val kafkaParams: ju.Map[String, Object]) extends Logging {
    +
    +  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
    +    "groupId used for cache key must match the groupId in kafkaParams")
    +
    +  @volatile private var consumer = createConsumer
    +
    +  /** indicates whether this consumer is in use or not */
    +  @volatile var inUse = true
    +
    +  /** indicate whether this consumer is going to be stopped in the next release */
    +  @volatile var markedForClose = false
    +
    +  // TODO if the buffer was kept around as a random-access structure,
    +  // could possibly optimize re-calculating of an RDD in the same batch
    +  @volatile private var buffer = ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
    +  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
    +
    +  override def toString: String = {
    +    "InternalKafkaConsumer(" +
    +      s"hash=${Integer.toHexString(hashCode)}, " +
    +      s"groupId=$groupId, " +
    +      s"topicPartition=$topicPartition)"
    +  }
    +
    +  /** Create a KafkaConsumer to fetch records for `topicPartition` */
    +  private def createConsumer: KafkaConsumer[K, V] = {
    +    val c = new KafkaConsumer[K, V](kafkaParams)
    +    val tps = new ju.ArrayList[TopicPartition]()
    +    tps.add(topicPartition)
    +    c.assign(tps)
    +    c
    +  }
    +
    +  def close(): Unit = consumer.close()
    +
    +  /**
    +   * Get the record for the given offset, waiting up to timeout ms if IO is necessary.
    +   * Sequential forward access will use buffers, but random access will be horribly inefficient.
    +   */
    +  def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = {
    +    logDebug(s"Get $groupId $topicPartition nextOffset $nextOffset requested $offset")
    +    if (offset != nextOffset) {
    +      logInfo(s"Initial fetch for $groupId $topicPartition $offset")
    +      seek(offset)
    +      poll(timeout)
    +    }
    +
    +    if (!buffer.hasNext()) { poll(timeout) }
    +    require(buffer.hasNext(),
    +      s"Failed to get records for $groupId $topicPartition $offset after polling for
$timeout")
    +    var record = buffer.next()
    +
    +    if (record.offset != offset) {
    +      logInfo(s"Buffer miss for $groupId $topicPartition $offset")
    +      seek(offset)
    +      poll(timeout)
    +      require(buffer.hasNext(),
    +        s"Failed to get records for $groupId $topicPartition $offset after polling for
$timeout")
    +      record = buffer.next()
    +      require(record.offset == offset,
    +        s"Got wrong record for $groupId $topicPartition even after seeking to offset
$offset " +
    +          s"got offset ${record.offset} instead. If this is a compacted topic, consider
enabling " +
    +          "spark.streaming.kafka.allowNonConsecutiveOffsets"
    +      )
    +    }
    +
    +    nextOffset = offset + 1
    +    record
    +  }
    +
    +  /**
    +   * Start a batch on a compacted topic
    +   */
    +  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
    +    logDebug(s"compacted start $groupId $topicPartition starting $offset")
    +    // This seek may not be necessary, but it's hard to tell due to gaps in compacted
topics
    +    if (offset != nextOffset) {
    +      logInfo(s"Initial fetch for compacted $groupId $topicPartition $offset")
    +      seek(offset)
    +      poll(pollTimeoutMs)
    +    }
    +  }
    +
    +  /**
    +   * Get the next record in the batch from a compacted topic.
    +   * Assumes compactedStart has been called first, and ignores gaps.
    +   */
    +  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
    +    if (!buffer.hasNext()) {
    +      poll(pollTimeoutMs)
    +    }
    +    require(buffer.hasNext(),
    +      s"Failed to get records for compacted $groupId $topicPartition " +
    +        s"after polling for $pollTimeoutMs")
    +    val record = buffer.next()
    +    nextOffset = record.offset + 1
    +    record
    +  }
    +
    +  /**
    +   * Rewind to previous record in the batch from a compacted topic.
    +   * @throws NoSuchElementException if no previous element
    +   */
    +  def compactedPrevious(): ConsumerRecord[K, V] = {
    +    buffer.previous()
    +  }
    +
    +  private def seek(offset: Long): Unit = {
    +    logDebug(s"Seeking to $topicPartition $offset")
    +    consumer.seek(topicPartition, offset)
    +  }
    +
    +  private def poll(timeout: Long): Unit = {
    +    val p = consumer.poll(timeout)
    +    val r = p.records(topicPartition)
    +    logDebug(s"Polled ${p.partitions()}  ${r.size}")
    +    buffer = r.listIterator
    +  }
    +
    +}
    +
    +private[kafka010]
    +object KafkaDataConsumer extends Logging {
    +
    +  private case class CachedKafkaDataConsumer[K, V](internalConsumer: InternalKafkaConsumer[K,
V])
    +    extends KafkaDataConsumer[K, V] {
    +    assert(internalConsumer.inUse) // make sure this has been set to true
    --- End diff --
    
    Removed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message