kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (KAFKA-5528) Error while reading topic, offset, partition info from process method
Date Tue, 27 Jun 2017 22:24:00 GMT

     [ https://issues.apache.org/jira/browse/KAFKA-5528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Matthias J. Sax updated KAFKA-5528:
-----------------------------------
    Description: 
We are encountering an {{IllegalStateException}} while trying to access {{context.topic()}}
from process function. The code is written in Scala and is being launched using sbt (spring
isn't involved). Here's the code sketch:

{noformat}
class GenericProcessor[T <: ThriftStruct](serDe: ThriftStructSerDe[T], decrypt: Boolean,
config: Config) extends AbstractProcessor[Array[Byte], Array[Byte]] with LazyLogging {
  private var hsmClient: HSMClient = _
  override def init(processorContext: ProcessorContext): Unit = { 
    super.init(processorContext) 
    hsmClient = HSMClient(config).getOrElse(null) 
  }
  override def process(key: Array[Byte], value: Array[Byte]): Unit = { 
    val topic: String = this.context.topic() 
    partition: Int = this.context.partition() 
    val offset: Long = this.context.offset() 
    val timestamp: Long = this.context.timestamp() 
    // business logic 
  }
}
{noformat}

The exception is thrown only for the multi-consumer case (when number of partitions for a
topic > 1 and parallelism > 1). 

  was:
We are encountering an IllegalStateException while trying to access context.topic() from process
function. The code is written in Scala and is being launched using sbt (spring isn't involved).
Here's the code sketch:

class GenericProcessor[T <: ThriftStruct](serDe: ThriftStructSerDe[T], decrypt: Boolean,
config: Config) extends AbstractProcessor[Array[Byte], Array[Byte]] with LazyLogging {
  private var hsmClient: HSMClient = _
  override def init(processorContext: ProcessorContext): Unit = { 
    super.init(processorContext) 
    hsmClient = HSMClient(config).getOrElse(null) 
  }
  override def process(key: Array[Byte], value: Array[Byte]): Unit = { 
    val topic: String = this.context.topic() 
    partition: Int = this.context.partition() 
    val offset: Long = this.context.offset() 
    val timestamp: Long = this.context.timestamp() 
    // business logic 
  }
}
The exception is thrown only for the multi-consumer case (when number of partitions for a
topic > 1 and parallelism > 1). 


> Error while reading topic, offset, partition info from process method
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-5528
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5528
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.0
>            Reporter: Nishkam Ravi
>
> We are encountering an {{IllegalStateException}} while trying to access {{context.topic()}}
from process function. The code is written in Scala and is being launched using sbt (spring
isn't involved). Here's the code sketch:
> {noformat}
> class GenericProcessor[T <: ThriftStruct](serDe: ThriftStructSerDe[T], decrypt: Boolean,
config: Config) extends AbstractProcessor[Array[Byte], Array[Byte]] with LazyLogging {
>   private var hsmClient: HSMClient = _
>   override def init(processorContext: ProcessorContext): Unit = { 
>     super.init(processorContext) 
>     hsmClient = HSMClient(config).getOrElse(null) 
>   }
>   override def process(key: Array[Byte], value: Array[Byte]): Unit = { 
>     val topic: String = this.context.topic() 
>     partition: Int = this.context.partition() 
>     val offset: Long = this.context.offset() 
>     val timestamp: Long = this.context.timestamp() 
>     // business logic 
>   }
> }
> {noformat}
> The exception is thrown only for the multi-consumer case (when number of partitions for
a topic > 1 and parallelism > 1). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message