Return-Path: X-Original-To: apmail-kafka-users-archive@www.apache.org Delivered-To: apmail-kafka-users-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 431FE11ED8 for ; Tue, 17 Jun 2014 09:24:55 +0000 (UTC) Received: (qmail 17431 invoked by uid 500); 17 Jun 2014 09:24:54 -0000 Delivered-To: apmail-kafka-users-archive@kafka.apache.org Received: (qmail 17391 invoked by uid 500); 17 Jun 2014 09:24:54 -0000 Mailing-List: contact users-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: users@kafka.apache.org Delivered-To: mailing list users@kafka.apache.org Received: (qmail 17377 invoked by uid 99); 17 Jun 2014 09:24:54 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 Jun 2014 09:24:54 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of achanta.vamsi@flipkart.com designates 209.85.160.175 as permitted sender) Received: from [209.85.160.175] (HELO mail-yk0-f175.google.com) (209.85.160.175) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 Jun 2014 09:24:50 +0000 Received: by mail-yk0-f175.google.com with SMTP id 9so5024243ykp.34 for ; Tue, 17 Jun 2014 02:24:29 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=flipkart.com; s=gapps; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=eu8YNG/L7eShL8T0g8Vn4acXBaakfBA7BKm3oDYgz4g=; b=jzsqDQ44MgJN6VzraMg4NhT72E0uvF5sWLo3o9sKrT7x/M/i/K3oUjiv3QL7buBNx8 4BPiXicAJEqrduilSX/UIeH7J8lNpHDMuDx1nbpVjyfht+QSA85DS7YKKlc9SJaMKZEz /zdEcEGybc/3dBmkQZXSfEaBYPmYdFd/Nk/EA= X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:date :message-id:subject:from:to:content-type; bh=eu8YNG/L7eShL8T0g8Vn4acXBaakfBA7BKm3oDYgz4g=; b=WggESZNmcHx0nmSd2YhvjatpA2/uRYri6YZC36zLTd8qswJ1Dknv66bBqPnMDqDud+ Q9Sx76xPVllhETTaZgXBj6yCO1TgnY5GqdPnExrtV+DECDubbRX3Jj08X2ObBnSDzarc CYj0xQFuKdwePuK408KUSLLoOF2FUrVcunVufnWQXbs37yPVH8mR4buJLIhQLlNRtgT2 3zKNSC2QWYogP3gpHSqVPbAOGts4fBUFWM4xB3BJJGTxKdChjOMiGDUs+/L6O32sVpGI mhoYAXerEoWI06yxXwmwl4SnGODS342mhYOobmXrqJXzEX9b1TMt7J1c64YDYDpaCEU3 ezSw== X-Gm-Message-State: ALoCoQlQYjyU0ZCM2QYqXe7bDsSwqmWCLrdAA0hs/nwWuC5zouXUcxHSLf5JE8pb0wRd4l+NZFkh MIME-Version: 1.0 X-Received: by 10.236.150.205 with SMTP id z53mr42214644yhj.75.1402997069264; Tue, 17 Jun 2014 02:24:29 -0700 (PDT) Received: by 10.170.47.208 with HTTP; Tue, 17 Jun 2014 02:24:29 -0700 (PDT) In-Reply-To: References: Date: Tue, 17 Jun 2014 14:54:29 +0530 Message-ID: Subject: Re: Offset of last un-consumed message From: Achanta Vamsi Subhash To: users@kafka.apache.org Content-Type: multipart/alternative; boundary=20cf303a2d61ef4a1104fc04b5b3 X-Virus-Checked: Checked by ClamAV on apache.org --20cf303a2d61ef4a1104fc04b5b3 Content-Type: text/plain; charset=UTF-8 Sorry. I want the first un-consumed message offset. On Tue, Jun 17, 2014 at 2:53 PM, Achanta Vamsi Subhash < achanta.vamsi@flipkart.com> wrote: > Hi, > > I have a consumer group with multiple threads (high-level consumers) which > read from a topic. > > I am also using a SimpleConsumer to read messages given a start offset. I > am getting the offset as the last produced message using the below code. > How to get the last un-consumed message? > > public long getLastOffset(SimpleConsumer consumer, String topic, int > partition, > long whichTime, String clientName) { > TopicAndPartition topicAndPartition = new TopicAndPartition(topic, > partition); > Map requestInfo = > new HashMap(); > requestInfo.put(topicAndPartition, new > PartitionOffsetRequestInfo(whichTime, maxReads)); > kafka.javaapi.OffsetRequest request = new > kafka.javaapi.OffsetRequest( > requestInfo, kafka.api.OffsetRequest.CurrentVersion(), > clientName); > OffsetResponse response = consumer.getOffsetsBefore(request); > > if (response.hasError()) { > LOGGER.error("Error fetching data Offset Data the Broker. > Reason: " + response.errorCode(topic, partition) ); > return 0; > } > long[] offsets = response.offsets(topic, partition); > return offsets[0]; > } > > > -- > Regards > Vamsi Subhash > -- Regards Vamsi Subhash --20cf303a2d61ef4a1104fc04b5b3--