Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id ABF03200CB5 for ; Wed, 28 Jun 2017 00:24:08 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id AA794160BF6; Tue, 27 Jun 2017 22:24:08 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 03D52160BDC for ; Wed, 28 Jun 2017 00:24:07 +0200 (CEST) Received: (qmail 82005 invoked by uid 500); 27 Jun 2017 22:24:07 -0000 Mailing-List: contact jira-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: jira@kafka.apache.org Delivered-To: mailing list jira@kafka.apache.org Received: (qmail 81994 invoked by uid 99); 27 Jun 2017 22:24:07 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 27 Jun 2017 22:24:07 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id A441BC0620 for ; Tue, 27 Jun 2017 22:24:06 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -99.202 X-Spam-Level: X-Spam-Status: No, score=-99.202 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id 6FIBB2LzXeOt for ; Tue, 27 Jun 2017 22:24:05 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 0F28D5F570 for ; Tue, 27 Jun 2017 22:24:05 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id C8556E0026 for ; Tue, 27 Jun 2017 22:24:03 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 5B91A2415D for ; Tue, 27 Jun 2017 22:24:01 +0000 (UTC) Date: Tue, 27 Jun 2017 22:24:00 +0000 (UTC) From: "Matthias J. Sax (JIRA)" To: jira@kafka.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Updated] (KAFKA-5528) Error while reading topic, offset, partition info from process method MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Tue, 27 Jun 2017 22:24:08 -0000 [ https://issues.apache.org/jira/browse/KAFKA-5528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-5528: ----------------------------------- Description: We are encountering an {{IllegalStateException}} while trying to access {{context.topic()}} from process function. The code is written in Scala and is being launched using sbt (spring isn't involved). Here's the code sketch: {noformat} class GenericProcessor[T <: ThriftStruct](serDe: ThriftStructSerDe[T], decrypt: Boolean, config: Config) extends AbstractProcessor[Array[Byte], Array[Byte]] with LazyLogging { private var hsmClient: HSMClient = _ override def init(processorContext: ProcessorContext): Unit = { super.init(processorContext) hsmClient = HSMClient(config).getOrElse(null) } override def process(key: Array[Byte], value: Array[Byte]): Unit = { val topic: String = this.context.topic() partition: Int = this.context.partition() val offset: Long = this.context.offset() val timestamp: Long = this.context.timestamp() // business logic } } {noformat} The exception is thrown only for the multi-consumer case (when number of partitions for a topic > 1 and parallelism > 1). was: We are encountering an IllegalStateException while trying to access context.topic() from process function. The code is written in Scala and is being launched using sbt (spring isn't involved). Here's the code sketch: class GenericProcessor[T <: ThriftStruct](serDe: ThriftStructSerDe[T], decrypt: Boolean, config: Config) extends AbstractProcessor[Array[Byte], Array[Byte]] with LazyLogging { private var hsmClient: HSMClient = _ override def init(processorContext: ProcessorContext): Unit = { super.init(processorContext) hsmClient = HSMClient(config).getOrElse(null) } override def process(key: Array[Byte], value: Array[Byte]): Unit = { val topic: String = this.context.topic() partition: Int = this.context.partition() val offset: Long = this.context.offset() val timestamp: Long = this.context.timestamp() // business logic } } The exception is thrown only for the multi-consumer case (when number of partitions for a topic > 1 and parallelism > 1). > Error while reading topic, offset, partition info from process method > --------------------------------------------------------------------- > > Key: KAFKA-5528 > URL: https://issues.apache.org/jira/browse/KAFKA-5528 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.2.0 > Reporter: Nishkam Ravi > > We are encountering an {{IllegalStateException}} while trying to access {{context.topic()}} from process function. The code is written in Scala and is being launched using sbt (spring isn't involved). Here's the code sketch: > {noformat} > class GenericProcessor[T <: ThriftStruct](serDe: ThriftStructSerDe[T], decrypt: Boolean, config: Config) extends AbstractProcessor[Array[Byte], Array[Byte]] with LazyLogging { > private var hsmClient: HSMClient = _ > override def init(processorContext: ProcessorContext): Unit = { > super.init(processorContext) > hsmClient = HSMClient(config).getOrElse(null) > } > override def process(key: Array[Byte], value: Array[Byte]): Unit = { > val topic: String = this.context.topic() > partition: Int = this.context.partition() > val offset: Long = this.context.offset() > val timestamp: Long = this.context.timestamp() > // business logic > } > } > {noformat} > The exception is thrown only for the multi-consumer case (when number of partitions for a topic > 1 and parallelism > 1). -- This message was sent by Atlassian JIRA (v6.4.14#64029)