Return-Path: X-Original-To: apmail-kafka-users-archive@www.apache.org Delivered-To: apmail-kafka-users-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4889F11ED6 for ; Tue, 17 Jun 2014 09:24:23 +0000 (UTC) Received: (qmail 15891 invoked by uid 500); 17 Jun 2014 09:24:21 -0000 Delivered-To: apmail-kafka-users-archive@kafka.apache.org Received: (qmail 15858 invoked by uid 500); 17 Jun 2014 09:24:21 -0000 Mailing-List: contact users-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: users@kafka.apache.org Delivered-To: mailing list users@kafka.apache.org Received: (qmail 15840 invoked by uid 99); 17 Jun 2014 09:24:20 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 Jun 2014 09:24:20 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of achanta.vamsi@flipkart.com designates 209.85.213.41 as permitted sender) Received: from [209.85.213.41] (HELO mail-yh0-f41.google.com) (209.85.213.41) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 Jun 2014 09:24:18 +0000 Received: by mail-yh0-f41.google.com with SMTP id z6so5265523yhz.14 for ; Tue, 17 Jun 2014 02:23:53 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=flipkart.com; s=gapps; h=mime-version:date:message-id:subject:from:to:content-type; bh=Nmv+sDUsOyWIkYYsBZ/E5jYJ6pgXfZbzGmhmhQPrByg=; b=QxIfeHbUe+L3NOHuqhleEVXuq6X7Y9H2mq5GdK9EHTY7FSYJr60GLvWIvbvBHATC15 Sr34z9xdVAugc7o8OdSummqA+snlJZPXjPeqY8xS9bKig+56Shm+FeY1W8g/D/+Nu52K AFjZqy49GkVpiHTSLDf1o6USVur94Gyc/dwbU= X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:date:message-id:subject:from:to :content-type; bh=Nmv+sDUsOyWIkYYsBZ/E5jYJ6pgXfZbzGmhmhQPrByg=; b=fbVqjp4k5kb7YfgWs7qbG1MV8K4X+YyYrbFUe02Lp96JjkvF6XoLMHnhDhmUMrGXmN DuBV4i3Fnlm0Z93IVOci9SXKnBnbu5j1S0nIa1XTAc/SX+KyPKzWyg8HS9jr1mQla9lS DjFtgGIymdHcvRnGUc5v6f92nDapcfKXaaV5b/jUHtMKmZHq6mgs4oAN+PehJRmMf2k8 D3uxLe4NmJy55h6GUAhfsDQJSHKjYznIaW/mLsOMQYlS4lpYEPEwmC4RArzWC+dMbpAV fM72UIJVR/8eLh/ecrMs1efV94teFPWVcWAwFLfSaWWnEume6VOL2PRTNZYhKQUZbA4u Likg== X-Gm-Message-State: ALoCoQnwt2f1XOCKmET8p9Wt2eB30SsTp63/t5w4cVGN070FLFduGGKxy+7rFkF45xMdc2CB/ciM MIME-Version: 1.0 X-Received: by 10.236.152.2 with SMTP id c2mr42055130yhk.25.1402997033686; Tue, 17 Jun 2014 02:23:53 -0700 (PDT) Received: by 10.170.47.208 with HTTP; Tue, 17 Jun 2014 02:23:53 -0700 (PDT) Date: Tue, 17 Jun 2014 14:53:53 +0530 Message-ID: Subject: Offset of last un-consumed message From: Achanta Vamsi Subhash To: users@kafka.apache.org Content-Type: multipart/alternative; boundary=20cf304275c8d065fc04fc04b3b5 X-Virus-Checked: Checked by ClamAV on apache.org --20cf304275c8d065fc04fc04b3b5 Content-Type: text/plain; charset=UTF-8 Hi, I have a consumer group with multiple threads (high-level consumers) which read from a topic. I am also using a SimpleConsumer to read messages given a start offset. I am getting the offset as the last produced message using the below code. How to get the last un-consumed message? public long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) { TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); Map requestInfo = new HashMap(); requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, maxReads)); kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest( requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); OffsetResponse response = consumer.getOffsetsBefore(request); if (response.hasError()) { LOGGER.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) ); return 0; } long[] offsets = response.offsets(topic, partition); return offsets[0]; } -- Regards Vamsi Subhash --20cf304275c8d065fc04fc04b3b5--