Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6670818440 for ; Thu, 3 Dec 2015 18:07:59 +0000 (UTC) Received: (qmail 36067 invoked by uid 500); 3 Dec 2015 18:07:59 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 35974 invoked by uid 500); 3 Dec 2015 18:07:59 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 35965 invoked by uid 99); 3 Dec 2015 18:07:59 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 03 Dec 2015 18:07:59 +0000 Received: from mail-vk0-f51.google.com (mail-vk0-f51.google.com [209.85.213.51]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id DBFC71A01D7 for ; Thu, 3 Dec 2015 18:07:58 +0000 (UTC) Received: by vkca188 with SMTP id a188so50985722vkc.0 for ; Thu, 03 Dec 2015 10:07:57 -0800 (PST) X-Gm-Message-State: ALoCoQkp1XnNkcmcOCT6nBYN6qxhjBkd8YIk+T6XSB6ONZfEHJYYRaKmoM0GhhP8TsGaNU0qjocP X-Received: by 10.31.14.84 with SMTP id 81mr6975942vko.3.1449166077899; Thu, 03 Dec 2015 10:07:57 -0800 (PST) MIME-Version: 1.0 Received: by 10.31.6.197 with HTTP; Thu, 3 Dec 2015 10:07:38 -0800 (PST) In-Reply-To: <68C660BD-9617-40FC-9C4D-47886CE104CF@yahoo.com> References: <914568111.14811224.1449162664088.JavaMail.yahoo@mail.yahoo.com> <68C660BD-9617-40FC-9C4D-47886CE104CF@yahoo.com> From: Maximilian Michels Date: Thu, 3 Dec 2015 19:07:38 +0100 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: Read Kafka topic from the beginning To: "user@flink.apache.org" Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hi Vladimir, Did you pass the properties to the FlinkKafkaConsumer? Cheers, Max On Thu, Dec 3, 2015 at 7:06 PM, Vladimir Stoyak wrote: > Gave it a try, but does not seem to help. Is it working for you? > > Thanks > > Sent from my iPhone > >> On Dec 3, 2015, at 6:11 PM, Vladimir Stoyak wrote: >> >> As far as I know "auto.offset.reset" what to do if offset it not availab= le or out of bound? >> >> Vladimir >> >> >> On Thursday, December 3, 2015 5:58 PM, Maximilian Michels wrote: >> Hi Vladimir, >> >> You may supply Kafka consumer properties when you create the FlinkKafkaC= onsumer. >> >> Properties props =3D new Properties(); >> >> // start from largest offset - DEFAULT >> props.setProperty("auto.offset.reset", "largest"); >> // start from smallest offset >> props.setProperty("auto.offset.reset", "smallest"); >> >> I don't think it is possible to start from a specific offset. The >> offset is only unique per partition. You could modify the offsets in >> the Zookeeper state but you really have to know what you're doing >> then. >> >> Best regards, >> Max >> >> >> >>> On Thu, Dec 3, 2015 at 4:01 PM, Vladimir Stoyak wro= te: >>> I see that Flink 0.10.1 now supports Keyed Schemas which allows us to r= ely on Kafka topics set to "compact" retention for data persistence. >>> >>> In our topology we wanted to set some topics with Log Compactions enabl= ed and read topic from the beginning when the topology starts or component = recovers. Does current Kafka Consumer implementation allow to read all mess= ages in a topic from the beginning or from a specific offset. >>> >>> Thanks, >>> Vladimir