flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Working with State example /flink streaming
Date Fri, 27 Nov 2015 10:09:50 GMT
Hi,
I’ll try to go into a bit more detail about the windows here. What you can do is this:

DataStream<Tuple3<String, Double, Long>> input = … // fields are (id, sum, count),
where count is initialized to 1, similar to word count

DataStream<Tuple3<String, Double, Long>> counts = input
  .keyBy(0)
  .timeWindow(Time.minutes(10))
  .reduce(new MyCountingReducer())

DataStream<Tuple3<String, Double, Long>> result = counts.map( <mapper that
divides sum by count> )

Does this help? Here, you don’t even have to deal with state, the windowing system will
keep the state (i.e. the reduced) value in internal state in a fault tolerant fashion.

Cheers,
Aljoscha
> On 26 Nov 2015, at 17:14, Stephan Ewen <sewen@apache.org> wrote:
> 
> Hi!
> 
> In streaming, there is no "end" of the stream when you would emit the final sum. That's
why there are windows.
> 
> If you do not want the partial sums, but only the final sum, you need to define what
window in which the sum is computed. At the end of that window, that value is emitted. The
window can be based on time, counts, or other measures.
> 
> Greetings,
> Stephan
> 
> 
> On Thu, Nov 26, 2015 at 4:07 PM, Lopez, Javier <javier.lopez@zalando.de> wrote:
> Hi, thanks for the answer. It worked but not in the way we expected. We expect to have
only one sum per ID and we are getting all the consecutive sums, for example:
> 
> We expect this: (11,6) but we get this (11,1) (11,3) (11,6) (the initial values are ID
-> 11, values -> 1,2,3). Here is the code we are using for our test:
> 
> DataStream<T
> uple2<String, Double>> stream = ...;
> 
> 
> DataStream<Tuple4<String, Double, Long, Double>> result = stream.keyBy(0).map(new
RollingSum());
> 
> 
> 
> public static class RollingSum extends RichMapFunction<Tuple2<String, Double>,
Tuple4<String, Double, Long, Double>> {
> 
>         // persistent counter
>     	private OperatorState<Double> sum;
>     	private OperatorState<Long> count;
>     	
> 
>         @Override
>         public Tuple4<String, Double, Long, Double> map(Tuple2<String, Double>
value1) {
>         	try {
>         		Double newSum = sum.value()+value1.f1;
>         		
> 				sum.update(newSum);
> 				count.update(count.value()+1);
> 				return new Tuple4<String, Double, Long, Double>(value1.f0,sum.value(),count.value(),sum.value()/count.value());
> 			} catch (IOException e) {
> 				// TODO Auto-generated catch block
> 				e.printStackTrace();
> 			}
>             
>         	return null;
>            
>         }
>         
>         @Override
>         public void open(Configuration config) {
>             sum = getRuntimeContext().getKeyValueState("mySum", Double.class, 0D);
>             count = getRuntimeContext().getKeyValueState("myCounter", Long.class, 0L);
>         }
> 
>     }
> 
> 
> We are using a Tuple4 because we want to calculate the sum and the average (So our Tuple
is ID, SUM, Count, AVG). Do we need to add another step to get a single value out of it? or
is this the expected behavior.
> 
> Thanks again for your help.
> 
> On 25 November 2015 at 17:19, Stephan Ewen <sewen@apache.org> wrote:
> Hi Javier!
> 
> You can solve this both using windows, or using manual state.
> 
> What is better depends a bit on when you want to have the result (the sum). Do you want
a result emitted after each update (or do some other operation with that value) or do you
want only the final sum after a certain time?
> 
> For the second variant, I would use a window, for the first variant, you could use custom
state as follows:
> 
> For each element, you take the current state for the key, add the value to get the new
sum. Then you update the state with the new sum and emit the value as well...
> 
> Java:
> 
> DataStream<T
> uple2<String, Long>> stream = ...;
> 
> 
> DataStream<Tuple2<String, Long>> result = stream.keyBy(0).map(new RollingSum());
> 
> 
> public
>  class RollingSum extends RichMapFunction<Tuple2<String, Long>, Tuple2<String,
Long>> {
> 
> 
>     
> private OperatorState<Long> sum;
> 
> 
>     
> @Override
> 
>     
> public Tuple2<String, Long> map(Tuple2<String, Long> value) {
>         long 
> newSum = sum.value() + value.f1;
> 
>         sum.update(newSum);
> 
>         
> return new Tuple2<>(value.f0, newSum);
> 
>     
> }
> 
> 
>     
> @Override
> 
>     
> public void open(Configuration config) {
> 
>         
> counter = getRuntimeContext().getKeyValueState("myCounter", Long.class, 0L);
> 
>     
> }
> }
> 
> 
> In Scala, you can write this briefly as:
> 
> val stream: DataStream[(String, Int)] = ...
> 
> 
> 
> val counts: DataStream[(String, Int)] = stream
> 
>   
> .keyBy(_._1)
> 
>   
> .mapWithState((in: (String, Int), sum: Option[Int]) 
> => {
> 
>     val newSum = in._2 + sum.getOrElse(0)
> 
>     ( (
> in._1, newSum), Some(newSum) )
>  }
> 
> Does that help?
> 
> Thanks also for pointing out the error in the sample code...
> 
> Greetings,
> Stephan
> 
> 
> On Wed, Nov 25, 2015 at 4:55 PM, Lopez, Javier <javier.lopez@zalando.de> wrote:
> Hi,
> 
> We are trying to do a test using States but we have not been able to achieve our desired
result. Basically we have a data stream with data as [{"id":"11","value":123}] and we want
to calculate the sum of all values grouping by ID. We were able to achieve this using windows
but not with  states. The example that is in the documentation (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#working-with-state)
is not very clear and even has some errors, for example:
> 
> public class CounterSum implements RichReduceFunction<Long>
> should be
> public class CounterSum extends RichReduceFunction<Long>
> as RichReduceFuncion is a Class, not an interface.
> 
> We wanted to ask you if you have an example of how to use States with Flink. 
> 
> Thanks in advance for your help.
> 
>  
> 
> 
> 


Mime
View raw message