spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jose-torres <...@git.apache.org>
Subject [GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Date Fri, 05 Jan 2018 01:33:51 GMT
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20096#discussion_r159795735
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/ContinuousKafkaReader.scala ---
    @@ -0,0 +1,246 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.io._
    +import java.nio.charset.StandardCharsets
    +import java.util.concurrent.atomic.AtomicBoolean
    +
    +import org.apache.commons.io.IOUtils
    +import org.apache.kafka.clients.consumer.ConsumerRecord
    +import org.apache.kafka.common.TopicPartition
    +import org.apache.kafka.common.errors.WakeupException
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext}
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection,
UnsafeRow}
    +import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter}
    +import org.apache.spark.sql.catalyst.util.DateTimeUtils
    +import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset}
    +import org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE, VERSION}
    +import org.apache.spark.sql.sources.v2.reader._
    +import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, ContinuousReader,
Offset, PartitionOffset}
    +import org.apache.spark.sql.types.StructType
    +import org.apache.spark.unsafe.types.UTF8String
    +
    +class ContinuousKafkaReader(
    +    kafkaReader: KafkaOffsetReader,
    +    executorKafkaParams: java.util.Map[String, Object],
    +    sourceOptions: Map[String, String],
    +    metadataPath: String,
    +    initialOffsets: KafkaOffsetRangeLimit,
    +    failOnDataLoss: Boolean)
    +  extends ContinuousReader with SupportsScanUnsafeRow with Logging {
    +
    +  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
    +    val mergedMap = offsets.map {
    +      case KafkaSourcePartitionOffset(p, o) => Map(p -> o)
    +    }.reduce(_ ++ _)
    +    KafkaSourceOffset(mergedMap)
    +  }
    +
    +  private lazy val session = SparkSession.getActiveSession.get
    +  private lazy val sc = session.sparkContext
    +
    +  private lazy val pollTimeoutMs = sourceOptions.getOrElse(
    +    "kafkaConsumer.pollTimeoutMs",
    +    sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString
    +  ).toLong
    +
    +  private val maxOffsetsPerTrigger =
    +    sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong)
    +
    +  /**
    +   * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll`
is only
    +   * called in StreamExecutionThread. Otherwise, interrupting a thread while running
    +   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
    +   */
    +  private lazy val initialPartitionOffsets = {
    --- End diff --
    
    oops, no


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message