kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Achanta Vamsi Subhash <achanta.va...@flipkart.com>
Subject Offset of last un-consumed message
Date Tue, 17 Jun 2014 09:23:53 GMT

I have a consumer group with multiple threads (high-level consumers) which
read from a topic.

I am also using a SimpleConsumer to read messages given a start offset. I
am getting the offset as the last produced message using the below code.
How to get the last un-consumed message?

    public long getLastOffset(SimpleConsumer consumer, String topic, int
                                     long whichTime, String clientName) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =
new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        requestInfo.put(topicAndPartition, new
PartitionOffsetRequestInfo(whichTime, maxReads));
        kafka.javaapi.OffsetRequest request = new
                requestInfo, kafka.api.OffsetRequest.CurrentVersion(),
        OffsetResponse response = consumer.getOffsetsBefore(request);

        if (response.hasError()) {
            LOGGER.error("Error fetching data Offset Data the Broker.
Reason: " + response.errorCode(topic, partition) );
            return 0;
        long[] offsets = response.offsets(topic, partition);
        return offsets[0];

Vamsi Subhash

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message