flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Working with State example /flink streaming
Date Wed, 25 Nov 2015 16:19:44 GMT
Hi Javier!

You can solve this both using windows, or using manual state.

What is better depends a bit on when you want to have the result (the sum).
Do you want a result emitted after each update (or do some other operation
with that value) or do you want only the final sum after a certain time?

For the second variant, I would use a window, for the first variant, you
could use custom state as follows:

For each element, you take the current state for the key, add the value to
get the new sum. Then you update the state with the new sum and emit the
value as well...


DataStream<Tuple2<String, Long>> stream =
...;DataStream<Tuple2<String, Long>> result = stream.keyBy(0).map(new

public class RollingSum extends RichMapFunction<Tuple2<String, Long>,
Tuple2<String, Long>> {

    private OperatorState<Long> sum;

    public Tuple2<String, Long> map(Tuple2<String, Long> value) {
        *long *newSum = sum.value() + value.f1;        sum.update(newSum);
        return *new* Tuple2<>(value.f0, newSum);

    public void open(Configuration config) {
        counter = getRuntimeContext().getKeyValueState("myCounter",
Long.class, 0L);

In Scala, you can write this briefly as:

val stream: DataStream[(String, Int)] = *...*
val counts: DataStream[(String, Int)] = stream
  .mapWithState((in: (String, Int), sum: Option[Int]) => {    *val*
newSum = in._2 + sum.getOrElse(0)
    ( (in._1, newSum), Some(newSum) )

Does that help?

Thanks also for pointing out the error in the sample code...


On Wed, Nov 25, 2015 at 4:55 PM, Lopez, Javier <javier.lopez@zalando.de>

> Hi,
> We are trying to do a test using States but we have not been able to
> achieve our desired result. Basically we have a data stream with data as
> [{"id":"11","value":123}] and we want to calculate the sum of all values
> grouping by ID. We were able to achieve this using windows but not with
>  states. The example that is in the documentation (
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#working-with-state)
> is not very clear and even has some errors, for example:
> public class CounterSum implements RichReduceFunction<Long>
> should be
> public class CounterSum extends RichReduceFunction<Long>
> as RichReduceFuncion is a Class, not an interface.
> We wanted to ask you if you have an example of how to use States with
> Flink.
> Thanks in advance for your help.

View raw message