kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Armin Braun (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (KAFKA-1895) Investigate moving deserialization and decompression out of KafkaConsumer
Date Wed, 22 Feb 2017 22:30:45 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-1895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879170#comment-15879170
] 

Armin Braun edited comment on KAFKA-1895 at 2/22/17 10:30 PM:
--------------------------------------------------------------

[~hachikuji]

{quote}

How would you propose to expose RawRecordIterator in the consumer API? The problem is that
IO is currently driven through the poll() API, so introducing something else would be a bit
weird (though maybe there's a nice way to do it).

{quote}

I admit this is potentially not ideal in terms of nice API design but I can see a relatively
short route to implementation when adding a method like below to `org.apache.kafka.clients.consumer.Consumer`:

{code}
    public RawRecordIterator pollRaw(long timeout);
{code}
 
which would simply return the RawRecordIterator that you could also use to back the current:

{code}
    /**
     * @see KafkaConsumer#poll(long)
     */
    public ConsumerRecords<K, V> poll(long timeout);
{code}

... then you could deserialize the records from the `RawRecordIterator`  when instantiating
`ConsumerRecords` from the `RawRecordIterator`.
I would also just deserialize when instantiating `ConsumerRecords` from `RawRecordIterator`
and have that drain the full `RawRecordIterator` to support:

{code}
    /**
     * Get just the records for the given partition
     * 
     * @param partition The partition to get records for
     */
    public List<ConsumerRecord<K, V>> records(TopicPartition partition) {
        List<ConsumerRecord<K, V>> recs = this.records.get(partition);
        if (recs == null)
            return Collections.emptyList();
        else
            return Collections.unmodifiableList(recs);
    }
{code}

still. So no API change needed here + I think moving to a reused set of `ByteBuffer` abstracted
by `RawRecordIterator` instead of the current solution constantly allocating `ByteBuffer`
wrapped in things like `org.apache.kafka.common.network.NetworkReceive` would be significantly
faster by saving lots of system calls (wouldn't have to read the header(size) bytes separately
anymore if pointers on the underlying buffers are used instead of setting up buffers and hence
byte[] after making that extra system call just to read 4 bytes).

So in a nutshell, just allow `pollRaw` which would always return a `RawRecordIterator`. Retain
all existing APIs and instantiate `ConsumerRecords` from the iterator in a (copying) way that
keeps `ConsumerRecord` immutable.

Hope this was understandable :) I understand there are a few tricky spots here in the implementation
(though given that a wealth of work on this exists especially in Spark it's not that hard
in 2017 imo), but just from the API I think the approach is sound. Plus it doesn't require
any trickery where `ConsumerRecord` instances either become mutable or must be "passed back
somewhere" to make them reusable, while still offering a 0 GC raw API.

EDIT:

So the semantic I would be looking for would be this:

{code}
while(/*  whatever your use case desires */) { 
Rawrecorditerator rawIterator = consumer.pollRaw(); //same object/iterator should be returned
here every time ... filled with new content.
   while(rawIterator.next()) {
      // do something with key and value buffer contents
   }
}
{code}


was (Author: original-brownbear):
[~hachikuji]

{quote}

How would you propose to expose RawRecordIterator in the consumer API? The problem is that
IO is currently driven through the poll() API, so introducing something else would be a bit
weird (though maybe there's a nice way to do it).

{quote}

I admit this is potentially not ideal in terms of nice API design but I can see a relatively
short route to implementation when adding a method like below to `org.apache.kafka.clients.consumer.Consumer`:

{code}
    public RawRecordIterator pollRaw(long timeout);
{code}
 
which would simply return the RawRecordIterator that you could also use to back the current:

{code}
    /**
     * @see KafkaConsumer#poll(long)
     */
    public ConsumerRecords<K, V> poll(long timeout);
{code}

... then you could deserialize the records from the `RawRecordIterator`  when instantiating
`ConsumerRecords` from the `RawRecordIterator`.
I would also just deserialize when instantiating `ConsumerRecords` from `RawRecordIterator`
and have that drain the full `RawRecordIterator` to support:

{code}
    /**
     * Get just the records for the given partition
     * 
     * @param partition The partition to get records for
     */
    public List<ConsumerRecord<K, V>> records(TopicPartition partition) {
        List<ConsumerRecord<K, V>> recs = this.records.get(partition);
        if (recs == null)
            return Collections.emptyList();
        else
            return Collections.unmodifiableList(recs);
    }
{code}

still. So no API change needed here + I think moving to a reused set of `ByteBuffer` abstracted
by `RawRecordIterator` instead of the current solution constantly allocating `ByteBuffer`
wrapped in things like `org.apache.kafka.common.network.NetworkReceive` would be significantly
faster by saving lots of system calls (wouldn't have to read the header(size) bytes separately
anymore if pointers on the underlying buffers are used instead of setting up buffers and hence
byte[] after making that extra system call just to read 4 bytes).

So in a nutshell, just allow `pollRaw` which would always return a `RawRecordIterator`. Retain
all existing APIs and instantiate `ConsumerRecords` from the iterator in a (copying) way that
keeps `ConsumerRecord` immutable.

Hope this was understandable :) I understand there are a few tricky spots here in the implementation
(though given that a wealth of work on this exists especially in Spark it's not that hard
in 2017 imo), but just from the API I think the approach is sound. Plus it doesn't require
any trickery where `ConsumerRecord` instances either become mutable or must be "passed back
somewhere" to make them reusable, while still offering a 0 GC raw API.

EDIT:

So the semantic I would be looking for would be this:

{code}
while(/*  whatever your use case desires */) { 
Rawrecorditerator rawIterator = consumer.pollRaw(); //same object/iterator should be returned
here every time ... filled with new content.
   while(rawIterator.next()) {
      // do something with key and value buffer contents
   }
}

> Investigate moving deserialization and decompression out of KafkaConsumer
> -------------------------------------------------------------------------
>
>                 Key: KAFKA-1895
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1895
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: consumer
>            Reporter: Jay Kreps
>
> The consumer implementation in KAFKA-1760 decompresses fetch responses and deserializes
them into ConsumerRecords which are then handed back as the result of poll().
> There are several downsides to this:
> 1. It is impossible to scale serialization and decompression work beyond the single thread
running the KafkaConsumer.
> 2. The results can come back during the processing of other calls such as commit() etc
which can result in caching these records a little longer.
> An alternative would be to have ConsumerRecords wrap the actual compressed serialized
MemoryRecords chunks and do the deserialization during iteration. This way you could scale
this over a thread pool if needed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message