flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: "Last One" Window
Date Fri, 20 May 2016 13:05:19 GMT
right now, the Streaming Join only works on windows, so it would not work
for your case.

You can implement an approximation of this using connected streams, i.e.:

forecastedData.connect(realData).flatMap( new CoFlatMapFunction() );

the CoFlatMapFunction would receive the forecasts on the first input and
the real data on the second input. Either of the sides you would store
internally and when an element from the other side arrives you can do the
comparison and emit a result. You can also use the partitioned state
abstraction for keeping the state in the function.

I have also recently started thinking about how we can put something like
this into the API in a more usable way. The current thoughts on this are
collected here:


On Thu, 19 May 2016 at 16:01 Artem Bogachev <artem.bogachev@ostrovok.ru>

> Hi,
> I’ve faced a problem trying to model our platform using Flink Streams.
> Let me describe our model:
> // Stream of data, ex. stocks: (AAPL, 100.0), (GZMP, 100.0) etc.
> val realData: DataStream[(K, V)] =  env.addSource(…)
> // Stream of forecasts (same format) based on some window aggregates
> val forecastedData: DataStream[(K, V)] =
> realData.keyBy(1).timeWindow(Time.minutes(FORECAST_INTERVAL)).apply(new
> Forecaster(…))
> I would like to construct a stream errors, which values are just
> differences between realData stream and the latest available forecast for
> this key in forecastedData stream
> // I suppose this solution does not guarantee that all realData values
> will have corresponding forecast
> val errors: DataStream[(K, V)] =
> realData.join(forecastedData).where(0).equal(0)…
> Could you give an advice on how to implement such pattern? Do I have to
> write custom windows?
> Artem

View raw message