flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gyula Fóra <gyf...@apache.org>
Subject Extending the streaming scala api with stateful functions
Date Fri, 24 Jul 2015 15:38:43 GMT

I would like to propose a way to extend the standard Streaming Scala API
methods (map, flatmap, filter etc) with versions that take stateful
functions as lambdas. I think this would eliminate the awkwardness of
implementing RichFunctions in Scala and make statefulness more explicit:

*For example:*
def map( statefulMap: (I, Option[S]) => (O, Option[S]) )
def flatMap( statefulFlatMap: (I, Option[S] ) => (Traversable[O],

This would be translated into RichMap and RichFlatMapFunctions that store
Option[S] as OperatorState for fault tolerance.

*Example rolling sum by key:*
val input: DataStream[Long] = ...
val sumByKey: DataStream[Long] =
    input.keyBy(...).map( (next: Long, sum: Option[Long]) =>
         sum match {
                   case Some(s) => (next + s, Some(next + s))
                   case None => (next, Some(next))

What do you think?


  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message