beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maurizio Sambati <>
Subject Fwd: Stateful processing with session window
Date Fri, 09 Feb 2018 15:46:31 GMT
Hi everyone,

I'm trying to write a simple pipeline to experiment both stateful
processing and session window.

I have an event stream, each event has a timestamp and a session key, I
want to group by each session and enrich all events using a common state of
the session. In this case I'm just replacing the event with an incremental

So, let's say I have a source that outputs an event every second and my
stream is [a, a, b, a, a, c, a, a, b, c, c, c, a, a] (I'm just writing only
the session key as the value is useless for the purpose of the issue I'm

I want the following output: [<a, 0>, <a, 1>, <b, 0>, <a, 2>, <a,
3>, ...]
(actually the order is not important)

Unluckily my code seems not to work as I was expecting and I'm not able to
understand the reason. (to be honest I haven't found many resources on the
topic) What I actually get is something like:

a, 0
a, 1
b, 0
a, 0    <-- ???
a, 2,   <---???
c, 0,

that makes me wonder if I have actually understood how the state is related
to the key-window pair or maybe if I have just misunderstood how the
window/triggering works.

My pipeline looks something like:


 .apply(MapElements.via(new ParseTableRowJson()))

 .apply(new AugmentEvents())

 .apply(ParDo.of(new DoFn<KV<String, Long>, Void>() {


  public void processElement(ProcessContext c)  { + ": " + c.element().getValue());




static class AugmentEvents extends PTransform<PCollection<TableRow>,
PCollection<KV<String, Long>>> {


  public PCollection<KV<String, Long>> expand(PCollection<TableRow> input)

    return input

      .apply(ParDo.of(new ExtractSessionIdAndTimestamp()))

      .apply(new ComputeSessions());



static class ComputeSessions extends PTransform<PCollection<KV<String,
TableRow>>, PCollection<KV<String, Long>>> {


  public PCollection<KV<String, Long>> expand(PCollection<KV<String,
TableRow>> events) {

    return events

      .apply(Window.<KV<String, TableRow>>into(Sessions.




      .apply(ParDo.of(new StatefulCount()));



static class StatefulCount extends DoFn<KV<String, TableRow>, KV<String,
Long>> {


  private final StateSpec<ValueState<Integer>> storageSpec =


  public void processElement(ProcessContext context, BoundedWindow window,
@StateId("storage") ValueState<Integer> storage) {

    Integer val =;

    if (val == null) {

      val = new Integer(0);


    int current = val.intValue();

    context.output(KV.of(context.element().getKey(), new Long(current)));





View raw message