From reviews-return-1167548-archive-asf-public=cust-asf.ponee.io@spark.apache.org Mon Sep 14 06:50:22 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mxout1-ec2-va.apache.org (mxout1-ec2-va.apache.org [3.227.148.255]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id F0D7A180636 for ; Mon, 14 Sep 2020 08:50:21 +0200 (CEST) Received: from mail.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mxout1-ec2-va.apache.org (ASF Mail Server at mxout1-ec2-va.apache.org) with SMTP id 3BDDC41843 for ; Mon, 14 Sep 2020 06:50:21 +0000 (UTC) Received: (qmail 3788 invoked by uid 500); 14 Sep 2020 06:50:21 -0000 Mailing-List: contact reviews-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list reviews@spark.apache.org Received: (qmail 3776 invoked by uid 99); 14 Sep 2020 06:50:21 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 14 Sep 2020 06:50:21 +0000 From: =?utf-8?q?GitBox?= To: reviews@spark.apache.org Subject: =?utf-8?q?=5BGitHub=5D_=5Bspark=5D_gaborgsomogyi_commented_on_a_change_in_pu?= =?utf-8?q?ll_request_=2329729=3A_=5BSPARK-32032=5D=5BSS=5D_Avoid_infinite_w?= =?utf-8?q?ait_in_driver_because_of_KafkaConsumer=2Epoll=28long=29_API?= Message-ID: <160006622095.32230.12935076779044833060.asfpy@gitbox.apache.org> Date: Mon, 14 Sep 2020 06:50:20 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit In-Reply-To: References: gaborgsomogyi commented on a change in pull request #29729: URL: https://github.com/apache/spark/pull/29729#discussion_r487686563 ########## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala ########## @@ -46,41 +49,35 @@ import org.apache.spark.util.{UninterruptibleThread, UninterruptibleThreadRunner private[kafka010] class KafkaOffsetReader( consumerStrategy: ConsumerStrategy, val driverKafkaParams: ju.Map[String, Object], - readerOptions: CaseInsensitiveMap[String], - driverGroupIdPrefix: String) extends Logging { + readerOptions: CaseInsensitiveMap[String]) extends Logging { /** - * [[UninterruptibleThreadRunner]] ensures that all [[KafkaConsumer]] communication called in an + * [[UninterruptibleThreadRunner]] ensures that all Kafka communication called in an * [[UninterruptibleThread]]. In the case of streaming queries, we are already running in an * [[UninterruptibleThread]], however for batch mode this is not the case. */ val uninterruptibleThreadRunner = new UninterruptibleThreadRunner("Kafka Offset Reader") - /** - * Place [[groupId]] and [[nextId]] here so that they are initialized before any consumer is - * created -- see SPARK-19564. - */ - private var groupId: String = null - private var nextId = 0 - - /** - * A KafkaConsumer used in the driver to query the latest Kafka offsets. This only queries the - * offsets and never commits them. - */ - @volatile protected var _consumer: Consumer[Array[Byte], Array[Byte]] = null + @volatile protected var _admin: Admin = null - protected def consumer: Consumer[Array[Byte], Array[Byte]] = synchronized { + protected def admin: Admin = synchronized { assert(Thread.currentThread().isInstanceOf[UninterruptibleThread]) - if (_consumer == null) { - val newKafkaParams = new ju.HashMap[String, Object](driverKafkaParams) - if (driverKafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG) == null) { - newKafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, nextGroupId()) - } - _consumer = consumerStrategy.createConsumer(newKafkaParams) + if (_admin == null) { + _admin = consumerStrategy.createAdmin(driverKafkaParams) } - _consumer + _admin } + def isolationLevel(): IsolationLevel = { + driverKafkaParams.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG) match { + case s: String => IsolationLevel.valueOf(s.toUpperCase(Locale.ROOT)) + case null => IsolationLevel.valueOf( + ConsumerConfig.DEFAULT_ISOLATION_LEVEL.toUpperCase(Locale.ROOT)) + } + } + + private def listOffsetsOptions() = new ListOffsetsOptions(isolationLevel()) Review comment: Same fix here. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org For additional commands, e-mail: reviews-help@spark.apache.org