kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yui Yoi <shalosh...@gmail.com>
Subject Re: A question about kafka streams API
Date Wed, 12 Sep 2018 11:59:57 GMT
Hi Adam,
Thanks a lot for the rapid response, it did helped!

Let me though ask one more simple question: Can I make a stream application
stuck on an invalid message? and not consuming any further messages?

Thanks again

On Wed, Sep 12, 2018 at 2:35 PM Adam Bellemare <adam.bellemare@gmail.com>
wrote:

> Hi Yui Yoi
>
> Preface: I am not familiar with the spring framework.
>
> "Earliest" when it comes to consuming from Kafka means, "Start reading from
> the first message in the topic, *if there is no offset stored for that
> consumer group*". It sounds like you are expecting it to re-read each
> message whenever a new message comes in. This is not going to happen, as
> there will be a committed offset and "earliest" will no longer be used. If
> you were to use "latest" instead, if a consumer is started that does not
> have a valid offset, it would use the very latest message in the topic as
> the starting offset for message consumption.
>
> Now, if you are using the same consumer group each time you run the
> application (which it seems is true, as you have "test-group" hardwired in
> your application.yml), but you do not tear down your local cluster and
> clear out its state, you will indeed see the behaviour you describe.
> Remember that Kafka is durable, and maintains the offsets when the
> individual applications go away. So you are probably seeing this:
>
> 1) start application instance 1. It realizes it has no offset when it tries
> to register as a consumer on the input topic, so it creates a new consumer
> entry for "earliest" for your consumer group.
> 2) send message "asd"
> 3) application instance 1 receives "asd", processes it, and updates the
> offset (offset head = 1)
> 4) Terminate instance 1
> 5) Start application instance 2. It detects correctly that consumer group
> "test-group" is available and reads that offset as its starting point.
> 6) send message "{}"
> 7) application instance 2 receives "{}", processes it, and updates the
> offset (offset head = 2)
> *NOTE:* App instance 2 NEVER received "asd", nor should it, as it is
> telling the Kafka cluster that it belongs to the same consumer group as
> application 1.
>
> Hope this helps,
>
> Adam
>
>
>
>
>
> On Wed, Sep 12, 2018 at 6:57 AM, Yui Yoi <shalosh233@gmail.com> wrote:
>
> > TL;DR:
> > my streams application skips uncommitted messages
> >
> > Hello,
> > I'm using streams API via spring framework and experiencing a weird
> > behavior which I would like to get an explanation to:
> > First of all: The attached zip is my test project, I used kafka cli to
> run
> > a localhost broker and zookeeper
> >
> > what is happening is as follows:
> > 1. I send an invalid message, such as "asd", and my consumer has a lag
> and
> > error message as expected
> > 2. I send a valid message such as "{}", but instead of rereading the
> first
> > message as expected from an "earliest" configured application - my
> > application reads the latest message, commits it and ignoring the one in
> > error, thus i have no lag!
> > 3. When I'm running my application when there are uncommitted messages -
> > my application reads the FIRST not committed message, as if it IS an
> > "earliest" configured application!
> >
> > In your documentation you assure "at least once" behavior, but according
> > to section 2. it happens so my application does not receive those
> messages
> > not even once (as i said, those messages are uncommitted)
> >
> > My guess is that it has something to do with the stream's cache... I
> would
> > very like to have an explanation or even a solution
> >
> > I'm turning to you as a last resort, after long weeks of research and
> > experiments
> >
> > Thanks alot
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message