Suggestions inline below...
On Mon, Jun 6, 2016 at 7:26 PM, Yukun Guo <gyk.net@gmail.com> wrote:
> Hi,
>
> I'm working on a project which uses Flink to compute hourly log statistics
> like topK. The logs are fetched from Kafka by a FlinkKafkaProducer and
> packed
> into a DataStream.
>
> The problem is, I find the computation quite challenging to express with
> Flink's DataStream API:
>
> 1. If I use something like `logs.timeWindow(Time.hours(1))`, suppose that
> the
> data volume is really high, e.g., billions of logs might be generated in
> one
> hour, will the window grow too large and can't be handled efficiently?
>
In the general case you can use:
stream
.timeWindow(...)
.apply(reduceFunction, windowFunction)
which can take a ReduceFunction and a WindowFunction. The ReduceFunction
is used to reduce the state on the fly and thereby keep the total state
size low. This can commonly be used in analytics applications to reduce
the state size that you're accumulating for each window. In the specific
case of TopK, however, you cannot do this if you want an exact result. To
get an exact result I believe you have to actually keep around all of the
data and then calculate TopK at the end in your WindowFunction. If you are
able to use approximate algorithms for your use case than you can calculate
a probabilistic incremental TopK based on some sort of sketchbased
algorithm.
>
> 2. We have to create a `KeyedStream` before applying `timeWindow`. However,
> the distribution of some keys are skewed hence using them may compromise
> the performance due to unbalanced partition loads. (What I want is just
> rebalance the stream across all partitions.)
>
A good and simple way to approach this may be to come up with a composite
key for your data that *is* uniformly distributed. You can imagine
something simple like 'natural_key:random_number'. Then keyBy(natural_key)
and reduce() again. For example:
stream
.keyBy(key, rand()) // partition by composite key that is
uniformly distributed
.timeWindow(1 hour)
.reduce() // preaggregation
.keyBy(key) // repartition
.timeWindow(1 hour)
.reduce() // final aggregation
>
> 3. The topK algorithm can be straightforwardly implemented with
> `DataSet`'s
> `mapPartition` and `reduceGroup` API as in
> [FLINK2549](https://github.com/apache/flink/pull/1161/), but not so easy
> if
> taking the DataStream approach, even with the stateful operators. I still
> cannot figure out how to reunion streams once they are partitioned.
>
> I'm not sure I know what you're trying to do here. What do you mean
by reunion?
> 4. Is it possible to convert a DataStream into a DataSet? If yes, how can I
> make Flink analyze the data incrementally rather than aggregating the logs
> for
> one hour before starting to process?
>
> There is no direct way to turn a DataStream into a DataSet. I addressed
the point about doing the computation incrementally above, though. You do
this with a ReduceFunction. But again, there doesn't exist an exact
incremental TopK algorithm that I'm aware of. This can be done with
sketching, though.

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
jamie@dataartisans.com
