Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6EA3218A06 for ; Wed, 25 Nov 2015 16:37:25 +0000 (UTC) Received: (qmail 17746 invoked by uid 500); 25 Nov 2015 16:37:25 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 17660 invoked by uid 500); 25 Nov 2015 16:37:25 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 17651 invoked by uid 99); 25 Nov 2015 16:37:25 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 25 Nov 2015 16:37:25 +0000 Received: from mail-vk0-f45.google.com (mail-vk0-f45.google.com [209.85.213.45]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id AB6C01A0048 for ; Wed, 25 Nov 2015 16:37:24 +0000 (UTC) Received: by vkay187 with SMTP id y187so37950092vka.3 for ; Wed, 25 Nov 2015 08:37:23 -0800 (PST) X-Gm-Message-State: ALoCoQmncDy5pwXjt9HiIkwqyg00QUxLzDc9lpQLsTlTaEuQAaBfQ6X8AywLBPt86YYg5Nh+/Vn0 X-Received: by 10.31.168.143 with SMTP id r137mr25521650vke.13.1448469443566; Wed, 25 Nov 2015 08:37:23 -0800 (PST) MIME-Version: 1.0 Received: by 10.31.6.197 with HTTP; Wed, 25 Nov 2015 08:37:04 -0800 (PST) In-Reply-To: References: From: Maximilian Michels Date: Wed, 25 Nov 2015 17:37:04 +0100 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: Working with State example /flink streaming To: "user@flink.apache.org" Content-Type: text/plain; charset=UTF-8 Hi Javier, Thanks for your question. I've corrected the documentation (will be online soon). Cheers, Max On Wed, Nov 25, 2015 at 5:19 PM, Stephan Ewen wrote: > Hi Javier! > > You can solve this both using windows, or using manual state. > > What is better depends a bit on when you want to have the result (the sum). > Do you want a result emitted after each update (or do some other operation > with that value) or do you want only the final sum after a certain time? > > For the second variant, I would use a window, for the first variant, you > could use custom state as follows: > > For each element, you take the current state for the key, add the value to > get the new sum. Then you update the state with the new sum and emit the > value as well... > > Java: > > DataStream> stream = ...; > > DataStream> result = stream.keyBy(0).map(new > RollingSum()); > > > public class RollingSum extends RichMapFunction, > Tuple2> { > > private OperatorState sum; > > @Override > public Tuple2 map(Tuple2 value) { > long newSum = sum.value() + value.f1; > sum.update(newSum); > return new Tuple2<>(value.f0, newSum); > } > > @Override > public void open(Configuration config) { > counter = getRuntimeContext().getKeyValueState("myCounter", > Long.class, 0L); > } > } > > > > In Scala, you can write this briefly as: > > val stream: DataStream[(String, Int)] = ... > > val counts: DataStream[(String, Int)] = stream > .keyBy(_._1) > .mapWithState((in: (String, Int), sum: Option[Int]) => { > val newSum = in._2 + sum.getOrElse(0) > ( (in._1, newSum), Some(newSum) ) > } > > > Does that help? > > Thanks also for pointing out the error in the sample code... > > Greetings, > Stephan > > > On Wed, Nov 25, 2015 at 4:55 PM, Lopez, Javier > wrote: >> >> Hi, >> >> We are trying to do a test using States but we have not been able to >> achieve our desired result. Basically we have a data stream with data as >> [{"id":"11","value":123}] and we want to calculate the sum of all values >> grouping by ID. We were able to achieve this using windows but not with >> states. The example that is in the documentation >> (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#working-with-state) >> is not very clear and even has some errors, for example: >> >> public class CounterSum implements RichReduceFunction >> >> should be >> >> public class CounterSum extends RichReduceFunction >> >> as RichReduceFuncion is a Class, not an interface. >> >> We wanted to ask you if you have an example of how to use States with >> Flink. >> >> Thanks in advance for your help. >> >> > >