kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-5528) Error while reading topic, offset, partition info from process method
Date Mon, 03 Jul 2017 15:22:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-5528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16072622#comment-16072622
] 

Matthias J. Sax commented on KAFKA-5528:
----------------------------------------

[~nravi] I guess the problem is, that you are using a single instance of {{GenericProcessor}}.
In {{addProcessor}} you return the same object on each call, but you need to return a new
instance each time. Can you try to change to {{addProcessor("PROCESS", () => new GenericProcessor[T](serDe,
decrypt, config), "SOURCE")}} ? Let us know if this fixes your problem or not.

We see this question regularly lately. Going to add an FAQ :)

> Error while reading topic, offset, partition info from process method
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-5528
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5528
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.0
>            Reporter: Nishkam Ravi
>
> We are encountering an {{IllegalStateException}} while trying to access {{context.topic()}}
from process function. The code is written in Scala and is being launched using sbt (spring
isn't involved). Here's the code sketch:
> {noformat}
> class GenericProcessor[T <: ThriftStruct](serDe: ThriftStructSerDe[T], decrypt: Boolean,
config: Config) extends AbstractProcessor[Array[Byte], Array[Byte]] with LazyLogging {
>   private var hsmClient: HSMClient = _
>   override def init(processorContext: ProcessorContext): Unit = { 
>     super.init(processorContext) 
>     hsmClient = HSMClient(config).getOrElse(null) 
>   }
>   override def process(key: Array[Byte], value: Array[Byte]): Unit = { 
>     val topic: String = this.context.topic() 
>     partition: Int = this.context.partition() 
>     val offset: Long = this.context.offset() 
>     val timestamp: Long = this.context.timestamp() 
>     // business logic 
>   }
> }
> {noformat}
> The exception is thrown only for the multi-consumer case (when number of partitions for
a topic > 1 and parallelism > 1). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message