kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From SenthilKumar K <senthilec...@gmail.com>
Subject Re: Kafka Read Data from All Partition Using Key or Timestamp
Date Thu, 25 May 2017 19:58:43 GMT
Thanks a lot Hans..  By using KafkaConsumer API
https://gist.github.com/senthilec566/16e8e28b32834666fea132afc3a4e2f9 i can
query the data using timestamp ..

It worked !


Now another question to achieve Parallelism on reading data ..

Example :  topic : test
                  partitions : 4

KafkaConsumer allows to search timestamp based messaged and search it in
each partition , Right now the way i coded is
1) Fetch no of Partitions
2) Use ForkJoinPool
3) Submit Task to ForknJoinPool
4) Combine result

Each task creates its own Consumer and reads the data ... Example total 4
consumers .. the expected search result time is little high.. Same case for
the case if i use Single Consumer since 4 times it has to read and join the
result..

How to implement this efficiently i.e in a Single Request read data from
all partitions etc ??? whichever way gives me good performance it would opt
for it :-) ..

Pls suggest me here !

Cheers,
Senthil


On Thu, May 25, 2017 at 8:30 PM, Hans Jespersen <hans@confluent.io> wrote:

> The timeindex was added in 0.10 so I think you need to use the new
> Consumer API to access this functionality. Specifically you should call
> offsetsForTimes()
>
> https://kafka.apache.org/0102/javadoc/org/apache/kafka/
> clients/consumer/Consumer.html#offsetsForTimes(java.util.Map)
>
> -hans
>
> > On May 25, 2017, at 6:39 AM, SenthilKumar K <senthilec566@gmail.com>
> wrote:
> >
> > I did an experiment on searching messages using timestamps ..
> >
> > Step 1: Used Producer with Create Time ( CT )
> > Step 2 : Verify whether it reflects in Kafka or not
> >              00000000000000000000.index      00000000000000000000.log
> >  00000000000000000000.timeindex
> >        These three files in disk and seems to be time_index working .
> >
> > Step 3: Let's look into data
> >        offset: 121 position: 149556 *CreateTime*: 1495718896912 isvalid:
> > true payloadsize: 1194 magic: 1 compresscodec: NONE crc: 1053048980
> > keysize: 8
> >
> >              Looks good ..
> > Step 4 :  Check .timeindex file .
> >              timestamp: 1495718846912 offset: 116
> >              timestamp: 1495718886912 offset: 120
> >              timestamp: 1495718926912 offset: 124
> >              timestamp: 1495718966912 offset: 128
> >
> > So all set for Querying data using timestamp ?
> >
> > Kafka version : kafka_2.11-0.10.2.1
> >
> > Here is the code i'm using to search query -->
> > https://gist.github.com/senthilec566/bc8ed1dfcf493f0bb5c473c50854dff9
> >
> > requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(
> queryTime,
> > 1));
> > If i pass my own timestamp , always getting zero result ..
> > *Same question asked here too
> > **https://stackoverflow.com/questions/31917134/how-to-use-
> unix-timestamp-to-get-offset-using-simpleconsumer-api
> > <https://stackoverflow.com/questions/31917134/how-to-use-
> unix-timestamp-to-get-offset-using-simpleconsumer-api>*
> > .
> >
> >
> > Also i could notice below error in index file:
> >
> > *Found timestamp mismatch* in
> > :/home/user/kafka-logs/topic-0/00000000000000000000.timeindex
> >
> >  Index timestamp: 0, log timestamp: 1495717686913
> >
> > *Found out of order timestamp* in
> > :/home/user/kafka-logs/topic-0/00000000000000000000.timeindex
> >
> >  Index timestamp: 0, Previously indexed timestamp: 1495719406912
> >
> > Not sure what is missing here :-( ... Pls advise me here!
> >
> >
> > Cheers,
> > Senthil
> >
> > On Thu, May 25, 2017 at 3:36 PM, SenthilKumar K <senthilec566@gmail.com>
> > wrote:
> >
> >> Thanks a lot Mayuresh. I will look into SearchMessageByTimestamp feature
> >> in Kafka ..
> >>
> >> Cheers,
> >> Senthil
> >>
> >> On Thu, May 25, 2017 at 1:12 PM, Mayuresh Gharat <
> >> gharatmayuresh15@gmail.com> wrote:
> >>
> >>> Hi Senthil,
> >>>
> >>> Kafka does allow search message by timestamp after KIP-33 :
> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+
> >>> Add+a+time+based+log+index#KIP-33-Addatimebasedlogindex-S
> >>> earchmessagebytimestamp
> >>>
> >>> The new consumer does provide you a way to get offsets by timestamp.
> You
> >>> can use these offsets to seek to that offset and consume from there.
> So if
> >>> you want to consume between a range you can get the start and end
> offset
> >>> based on the timestamps, seek to the start offset and consume and
> process
> >>> the data till you reach the end offset.
> >>>
> >>> But these timestamps are either CreateTime(when the message was created
> >>> and you will have to specify this when you do the send()) or
> >>> LogAppendTime(when the message was appended to the log on the kafka
> broker)
> >>> : https://kafka.apache.org/0101/javadoc/org/apache/kafka/clien
> >>> ts/producer/ProducerRecord.html
> >>>
> >>> Kafka does not look at the fields in your data (key/value) for giving
> >>> back you the data. What I meant was it will not look at the timestamp
> >>> specified by you in the actual data payload.
> >>>
> >>> Thanks,
> >>>
> >>> Mayuresh
> >>>
> >>> On Thu, May 25, 2017 at 12:43 PM, SenthilKumar K <
> senthilec566@gmail.com>
> >>> wrote:
> >>>
> >>>> Hello Dev Team, Pls let me know if any option to read data from Kafka
> >>>> (all
> >>>> partition ) using timestamp . Also can we set custom offset value to
> >>>> messages ?
> >>>>
> >>>> Cheers,
> >>>> Senthil
> >>>>
> >>>> On Wed, May 24, 2017 at 7:33 PM, SenthilKumar K <
> senthilec566@gmail.com>
> >>>> wrote:
> >>>>
> >>>>> Hi All ,  We have been using Kafka for our Use Case which helps
in
> >>>>> delivering real time raw logs.. I have a requirement to fetch data
> from
> >>>>> Kafka by using offset ..
> >>>>>
> >>>>> DataSet Example :
> >>>>> {"access_date":"2017-05-24 13:57:45.044","format":"json",
> >>>>> "start":"1490296463.031"}
> >>>>> {"access_date":"2017-05-24 13:57:46.044","format":"json",
> >>>>> "start":"1490296463.031"}
> >>>>> {"access_date":"2017-05-24 13:57:47.044","format":"json",
> >>>>> "start":"1490296463.031"}
> >>>>> {"access_date":"2017-05-24 13:58:02.042","format":"json",
> >>>>> "start":"1490296463.031"}
> >>>>>
> >>>>> Above JSON data will be stored in Kafka..
> >>>>>
> >>>>> Key --> acces_date in epoch format
> >>>>> Value --> whole JSON.
> >>>>>
> >>>>> Data Access Pattern:
> >>>>>  1) Get me last 2 minz data ?
> >>>>>   2) Get me records between 2017-05-24 13:57:42:00 to 2017-05-24
> >>>>> 13:57:44:00 ?
> >>>>>
> >>>>> How to achieve this in Kafka ?
> >>>>>
> >>>>> I tried using SimpleConsumer , but it expects partition and not
sure
> >>>>> SimpleConsumer would match our requirement...
> >>>>>
> >>>>> Appreciate you help !
> >>>>>
> >>>>> Cheers,
> >>>>> Senthil
> >>>
> >>>
> >>>
> >>> --
> >>> -Regards,
> >>> Mayuresh R. Gharat
> >>> (862) 250-7125
> >>
> >>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message