kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Nishkam Ravi (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-4344) Exception when accessing partition, offset and timestamp in processor class
Date Tue, 27 Jun 2017 06:56:02 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-4344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16064379#comment-16064379
] 

Nishkam Ravi commented on KAFKA-4344:
-------------------------------------

[~guozhang] We are encountering the same error (for topic()). The code is written in Scala
and is being launched using sbt (spring isn't involved). Here's the code sketch:

class GenericProcessor[T <: ThriftStruct](serDe: ThriftStructSerDe[T], decrypt: Boolean,
config: Config) extends AbstractProcessor[Array[Byte], Array[Byte]] with LazyLogging {

  private var hsmClient: HSMClient = _

  override def init(processorContext: ProcessorContext): Unit = {
    super.init(processorContext)
    hsmClient = HSMClient(config).getOrElse(null)
  }

  override def process(key: Array[Byte], value: Array[Byte]): Unit = {
    val topic: String = this.context.topic() // exception thrown here
    val partition: Int = this.context.partition()
    val offset: Long = this.context.offset()
    val timestamp: Long = this.context.timestamp()
    // business logic
  }
}

The exception is thrown only for the multi-consumer case (when number of partitions for a
topic > 1 and parallelism > 1). This should be easy to reproduce. 

> Exception when accessing partition, offset and timestamp in processor class
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-4344
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4344
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.1.0
>            Reporter: saiprasad mishra
>            Assignee: Guozhang Wang
>            Priority: Minor
>
> I have a kafka stream pipeline like below
> source topic stream -> filter for null value ->map to make it keyed by id ->custom
processor to mystore ->to another topic -> ktable
> I am hitting the below type of exception in a custom processor class if I try to access
offset() or partition() or timestamp() from the ProcessorContext in the process() method.
I was hoping it would return the partition and offset for the enclosing topic(in this case
source topic) where its consuming from or -1 based on the api docs.
> java.lang.IllegalStateException: This should not happen as offset() should only be called
while a record is processed
> 	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.offset(ProcessorContextImpl.java:181)
~[kafka-streams-0.10.1.0.jar!/:?]
> 	at com.sai.repo.MyStore.process(MyStore.java:72) ~[classes!/:?]
> 	at com.sai.repo.MyStore.process(MyStore.java:39) ~[classes!/:?]
> 	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
~[kafka-streams-0.10.1.0.jar!/:?]
> 	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
~[kafka-streams-0.10.1.0.jar!/:?]
> 	at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43)
~[kafka-streams-0.10.1.0.jar!/:?]
> 	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
~[kafka-streams-0.10.1.0.jar!/:?]
> 	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
~[kafka-streams-0.10.1.0.jar!/:?]
> 	at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
~[kafka-streams-0.10.1.0.jar!/:?]
> 	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
~[kafka-streams-0.10.1.0.jar!/:?]
> 	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
~[kafka-streams-0.10.1.0.jar!/:?]
> 	at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
~[kafka-streams-0.10.1.0.jar!/:?]
> 	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:181)
~[kafka-streams-0.10.1.0.jar!/:?]
> 	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
~[kafka-streams-0.10.1.0.jar!/:?]
> 	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
[kafka-streams-0.10.1.0.jar!/:?]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message