samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yi Pan <nickpa...@gmail.com>
Subject Re: question on commit on changelog
Date Tue, 04 Aug 2015 16:54:30 GMT
Hi, Chen,

So, is your goal to improve the throughput to the changelog topic or reduce
the size of the changelog topic? If you are targeting for later and your
KV-store truly is of the size of the input log, I don't see how it is
possible. In a lot of use cases, users will only need to retain the
*recent* certain time period of input log. In that case, you can choose to
periodically purge the expired records in KV-store to reduce the size (both
for the KV-store and the changelog).

Regards,
-Yi

On Tue, Aug 4, 2015 at 7:25 AM, Chen Song <chen.song.82@gmail.com> wrote:

> Thanks Yan.
>
> Very good explanation on 1).
>
> For 2), I understand that users can tune the size of the batch for Kafka
> producer. However, that doesn't change the number of messages sent to the
> changelog topic. In our case, we process a high volume log  (1.5MM
> records/second) will update kv store for each message and this will result
> the changelog to grow to the same size of input log. Even with compaction
> turned on changelog, it is not very scalable. I am wondering if there is a
> way to mitigate this problem.
>
>
> On Wed, Jul 22, 2015 at 2:12 PM, Yan Fang <yanfang724@gmail.com> wrote:
>
> > Hi Chen Song,
> >
> > There are two different concepts: *checkpoint* and *changelog*.
> Checkpoint
> > is for the offset of the messages, while the changelog is for the
> kv-store.
> > The code snippet you show is for the checkpoint , not for the changelog.
> >
> > {quote}
> > 1. When implementing our Samza task, does each call of process method
> > triggers a call to TaskInstance.commit?
> > {quote}
> >
> > TaskInstance.commit triggers the *checkpoint* . It is triggered every
> > task.commit.ms , (default is 60000ms). The code is here
> > <
> >
> https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala#L149-166
> > >
> > . Basically, the RunLoop class calls the commit method, but only trigger
> > the commit behavior every configured time.
> >
> > If you are talking about the *changelog*, it's not controlled by the
> commit
> > method. Instead, every put/delete calls the "send
> > <
> >
> https://github.com/apache/samza/blob/master/samza-api/src/main/java/org/apache/samza/system/SystemProducer.java#L51
> > >"
> > of the system Producer. (code is here
> > <
> >
> https://github.com/apache/samza/blob/master/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala#L62-L66
> > >).
> > In terms of how often the "send" really *send *to the broker (e.g.
> kafka),
> > it depends on your producer's configuration. For example, in Kafka, you
> can
> > have the producer send a batch (setting async), or send one msg a time
> > (setting sync). What it means is that, it leaves the System to decide how
> > to deal with the "send" method.
> >
> >
> > {quote}
> > 2. Is there a way to buffer these commit activities in memory and flush
> > periodically? Our job is joining >1mm messages per second using a KV
> store
> > and we have a lot of concern for the changelog size, as in the worst
> case,
> > the change log will grow as fast as the input log.
> > {quote}
> >
> > If you are talking about the checkpoint, you can change the
> task.commit.ms
> > .
> >
> > If you are thinking of the changelog (kv-store), you can change the
> > producer's config to batch a few changes and send to the broker.
> >
> > I think the guys in the community with more operational experience are
> able
> > to tell you what is the best practice.
> >
> > Thanks,
> >
> > Fang, Yan
> > yanfang724@gmail.com
> >
> > On Wed, Jul 22, 2015 at 9:00 AM, Chen Song <chen.song.82@gmail.com>
> wrote:
> >
> > > We are trying to understand the order of commits when processing each
> > > message in a Samza job.
> > >
> > > T1: input offset commit
> > > T2: changelog commit
> > > T3: output commit
> > >
> > > By looking at the code snippet in
> > >
> > >
> >
> https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala#L155-L171
> > > ,
> > > my understanding is that for each input message, Samza always send
> update
> > > message on changelog, send the output message and then commit the input
> > > offset. It makes sense to me at the high level in terms of at least
> once
> > > processing.
> > >
> > > Specifically, we have two dumb questions:
> > >
> > > 1. When implementing our Samza task, does each call of process method
> > > triggers a call to TaskInstance.commit?
> > > 2. Is there a way to buffer these commit activities in memory and flush
> > > periodically? Our job is joining >1mm messages per second using a KV
> > store
> > > and we have a lot of concern for the changelog size, as in the worst
> > case,
> > > the change log will grow as fast as the input log.
> > >
> > > Chen
> > >
> > > --
> > > Chen Song
> > >
> >
>
>
>
> --
> Chen Song
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message