kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "fml2 (Jira)" <j...@apache.org>
Subject [jira] [Updated] (KAFKA-12328) Expose TaskId partition number
Date Sat, 20 Feb 2021 19:14:00 GMT

     [ https://issues.apache.org/jira/browse/KAFKA-12328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

fml2 updated KAFKA-12328:
-------------------------
    Description: 
This question was posted [on stackoverflow|https://stackoverflow.com/questions/66032099/kafka-streams-how-to-get-the-partition-an-iterartor-is-iterating-over]
and got an answer but the solution is quite complicated hence this ticket.
  
 In my Kafka Streams application, I have a task that sets up a scheduled (by the wall time)
punctuator. The punctuator iterates over the entries of a store and does something with them.
Like this:
{code:java}
var store = context().getStateStore("MyStore");
var iter = store.all();

while (iter.hasNext()) {
   var entry = iter.next();
   // ... do something with the entry
}

// Print a summary (now): N entries processed
// Print a summary (wish): N entries processed in partition P
{code}
Is it possible to find out which partition the punctuator operates on? The java docs for {{ProcessorContext.partition()}}
states that this method returns {{-1}} within punctuators.

I've read [Kafka Streams: Punctuate vs Process|https://stackoverflow.com/questions/50776987/kafka-streams-punctuate-vs-process]
and the answers there. I can understand that a task is, in general, not tied to a particular
partition. But an iterator should be tied IMO.

How can I find out the partition?

Or is my assumption that a particular instance of a store iterator is tied to a partion wrong?

What I need it for: I'd like to include the partition number in some log messages. For now,
I have several nearly identical log messages stating that the punctuator does this and that.
In order to make those messages "unique" I'd like to include the partition number into them.
 Since I'm working with a single store here (which might be partitioned), I assume that every
single execution of the punctuator is bound to a single partition of that store.
  
 It would be cool if there were a method {{iterator.partition}} (or similar) to get this information.

  was:
This question was posted [on stakoverflow|https://stackoverflow.com/questions/66032099/kafka-streams-how-to-get-the-partition-an-iterartor-is-iterating-over]
and got an answer but the solution is quite complicated hence this ticket.
 
In my Kafka Streams application, I have a task that sets up a scheduled (by the wall time)
punctuator. The punctuator iterates over the entries of a store and does something with them.
Like this:
{code:java}
var store = context().getStateStore("MyStore");
var iter = store.all();

while (iter.hasNext()) {
   var entry = iter.next();
   // ... do something with the entry
}

// Print a summary (now): N entries processed
// Print a summary (wish): N entries processed in partition P
{code}
Is it possible to find out which partition the punctuator operates on? The java docs for {{ProcessorContext.partition()}}
states that this method returns {{-1}} within punctuators.

I've read [Kafka Streams: Punctuate vs Process|https://stackoverflow.com/questions/50776987/kafka-streams-punctuate-vs-process]
and the answers there. I can understand that a task is, in general, not tied to a particular
partition. But an iterator should be tied IMO.

How can I find out the partition?

Or is my assumption that a particular instance of a store iterator is tied to a partion wrong?

What I need it for: I'd like to include the partition number in some log messages. For now,
I have several nearly identical log messages stating that the punctuator does this and that.
In order to make those messages "unique" I'd like to include the partition number into them.
Since I'm working with a single store here (which might be partitioned), I assume that every
single execution of the punctuator is bound to a single partition of that store.
 
It would be cool if there were a method {{iterator.partition}} (or similar) to get this information.


> Expose TaskId partition number
> ------------------------------
>
>                 Key: KAFKA-12328
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12328
>             Project: Kafka
>          Issue Type: Wish
>          Components: streams
>            Reporter: fml2
>            Priority: Major
>              Labels: needs-kip
>
> This question was posted [on stackoverflow|https://stackoverflow.com/questions/66032099/kafka-streams-how-to-get-the-partition-an-iterartor-is-iterating-over]
and got an answer but the solution is quite complicated hence this ticket.
>   
>  In my Kafka Streams application, I have a task that sets up a scheduled (by the wall
time) punctuator. The punctuator iterates over the entries of a store and does something with
them. Like this:
> {code:java}
> var store = context().getStateStore("MyStore");
> var iter = store.all();
> while (iter.hasNext()) {
>    var entry = iter.next();
>    // ... do something with the entry
> }
> // Print a summary (now): N entries processed
> // Print a summary (wish): N entries processed in partition P
> {code}
> Is it possible to find out which partition the punctuator operates on? The java docs
for {{ProcessorContext.partition()}} states that this method returns {{-1}} within punctuators.
> I've read [Kafka Streams: Punctuate vs Process|https://stackoverflow.com/questions/50776987/kafka-streams-punctuate-vs-process]
and the answers there. I can understand that a task is, in general, not tied to a particular
partition. But an iterator should be tied IMO.
> How can I find out the partition?
> Or is my assumption that a particular instance of a store iterator is tied to a partion
wrong?
> What I need it for: I'd like to include the partition number in some log messages. For
now, I have several nearly identical log messages stating that the punctuator does this and
that. In order to make those messages "unique" I'd like to include the partition number into
them.
>  Since I'm working with a single store here (which might be partitioned), I assume that
every single execution of the punctuator is bound to a single partition of that store.
>   
>  It would be cool if there were a method {{iterator.partition}} (or similar) to get this
information.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message