Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id EA967200C30 for ; Tue, 7 Mar 2017 23:05:19 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id E9167160B68; Tue, 7 Mar 2017 22:05:19 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 19C4D160B65 for ; Tue, 7 Mar 2017 23:05:18 +0100 (CET) Received: (qmail 260 invoked by uid 500); 7 Mar 2017 22:04:54 -0000 Mailing-List: contact user-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@spark.apache.org Received: (qmail 250 invoked by uid 99); 7 Mar 2017 22:04:54 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 Mar 2017 22:04:54 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 55B09C31C6 for ; Tue, 7 Mar 2017 22:04:54 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.405 X-Spam-Level: ** X-Spam-Status: No, score=2.405 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_FONT_LOW_CONTRAST=0.001, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-0.096, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=databricks-com.20150623.gappssmtp.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id gac8n4iicKj0 for ; Tue, 7 Mar 2017 22:04:52 +0000 (UTC) Received: from mail-qk0-f176.google.com (mail-qk0-f176.google.com [209.85.220.176]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id EBDD55F1F4 for ; Tue, 7 Mar 2017 22:04:51 +0000 (UTC) Received: by mail-qk0-f176.google.com with SMTP id p64so28934054qke.1 for ; Tue, 07 Mar 2017 14:04:51 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=databricks-com.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc; bh=hpx5Tq370DKfYFbHlcEtCnmOd9cvkHY4gXbBtxWPP64=; b=zzIejge4UU32b4jv8nxObGxlOm8tzv3s/BIKwob35LMvbqC2xogF1ewJsJO5wQtCCf eVbiXLVHGcpwq1Rmrt3DbAbk3DELzYmlfC9kADCdFTDvDY+zHNhwIaXf1sbf3aGTqPnz 6jncSHLP5eGLgqJT1zpGSyt+cIW71NJu0uTKZfwb+A5Th3uNXPF8YoCM2nKMJbatYk7k X2kCbCDZEnArF/D4Vnx88e0dIpBPjK9eHDV6XH3hImlOU01EPBGh/acEEMX8rBtWmc9o 9e2M1pYsljOfadpuTSeNMLx5m3xFRgVmE+/ThLSNvsSe3/ioEJeRvRNU5vnJmcyP1/og VICQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc; bh=hpx5Tq370DKfYFbHlcEtCnmOd9cvkHY4gXbBtxWPP64=; b=D/hokyg/lHhDx8VZ5hIN7J4bPQt/fXYlmPvMPDKMaCcNP2yJf76/hLgo//uujuBO6z p5/1S/4x8gJKnXGz+ezljPtTtfjYSzE+hk2Jn5qEhHBzvfN3I7190dUuCPSpaN1OZpRL IZ6sdzj/7HjNFH0OFF8BsKrlzW1BNrTBxa+1PplBaQUwKver04u15Tlzwr+v9u1QeJWl PpaOlrRfHEiSlrifp/7Q/kO9WSOoVFGN/RF1Qc0etG00N8m25J519eE+dLJM4J7H93Qm k1OcYYPqj8EMRjyhvED5ZRxeyo3HGX0Xkdg0ud/7dlcYHkOp2vwNecYSVJCIOLv2z3Gr 8MyQ== X-Gm-Message-State: AMke39m5rLUKI409oUYwxTR076uetdET+cAihaWY588E1U8JICTrE8LIkBdzjj19qh2asqQc2voFi1mwTphIoIUg X-Received: by 10.200.47.208 with SMTP id m16mr3369714qta.103.1488924286125; Tue, 07 Mar 2017 14:04:46 -0800 (PST) MIME-Version: 1.0 Received: by 10.12.137.200 with HTTP; Tue, 7 Mar 2017 14:04:45 -0800 (PST) In-Reply-To: References: From: "Shixiong(Ryan) Zhu" Date: Tue, 7 Mar 2017 14:04:45 -0800 Message-ID: Subject: Re: Structured Streaming - Kafka To: "Bowden, Chris" Cc: user , "Gudenkauf, Jack" Content-Type: multipart/alternative; boundary=001a1143c65a2c5e49054a2b33d9 archived-at: Tue, 07 Mar 2017 22:05:20 -0000 --001a1143c65a2c5e49054a2b33d9 Content-Type: text/plain; charset=UTF-8 Good catch. Could you create a ticket? You can also submit a PR to fix it if you have time :) On Tue, Mar 7, 2017 at 1:52 PM, Bowden, Chris wrote: > Potential bug when using startingOffsets = SpecificOffsets with Kafka > topics containing uppercase characters? > > KafkaSourceProvider#L80/86: > > val startingOffsets = > caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) match { > case Some("latest") => LatestOffsets > case Some("earliest") => EarliestOffsets > case Some(json) => SpecificOffsets(JsonUtils.partitionOffsets(json)) > case None => LatestOffsets > } > > Topics in JSON get lowered so underlying assignments in the consumer are > incorrect, and the assertion in KafkaSource#L326 triggers: > > private def fetchSpecificStartingOffsets( > partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = { > val result = withRetriesWithoutInterrupt { > // Poll to get the latest assigned partitions > consumer.poll(0) > val partitions = consumer.assignment() > consumer.pause(partitions) > assert(partitions.asScala == partitionOffsets.keySet, > "If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" + > "Use -1 for latest, -2 for earliest, if you don't care.\n" + > s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions.asScala}") > > --001a1143c65a2c5e49054a2b33d9 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Good catch. Could you create a ticket? You can also submit= a PR to fix it if you have time :)

On Tue, Mar 7, 2017 at 1:52 PM, Bowden, Chris <= chris.bowden@hpe.com> wrote:

Potential bug when using startingOffsets = =3D SpecificOffsets with Kafka topics containing uppercase characters?

KafkaSourceProvider#L80/86:

val startingOffsets =
=3D
caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) match {
= case= Some("latest") =3D> Latest= Offsets
case Some("earliest&= quot;) =3D> EarliestOffsets
<= /span>case Some(json) =3D> SpecificOffsets(JsonUtils.partitionOffsets(json))
= case None =3D> LatestOffsets
<= /span> = }
Topics in JSON get lowered so underlying assignments in the consumer are in= correct, and the assertion in KafkaSource#L326 triggers:
private def fetchSpecificStartingOffsets(
partitionOffsets: Map[TopicPartition, Long]): Map[<= /span>TopicPartition, Long] =3D {
= val resu= lt =3D withRetriesWithoutInterrupt { // Poll to get the latest assigned partitions
consumer.poll(
0)
= val partitions =3D consumer.assignme= nt()
consumer.pause(partitions
) assert(partitions.asScala = =3D=3D partitionOffsets.keySet,
"= ;If startingOffsets contains specific offsets, you must specify all TopicPa= rtitions.\n" +
"Us= e -1 for latest, -2 for earliest, if you don't care.\n" + s"Specified: $= {partitionOffsets.keySet} Assigned: ${partitions.asScala}")

--001a1143c65a2c5e49054a2b33d9--