beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lukasz Cwik <>
Subject Re: Usecase scenario: Job definition from low frequently changing storage
Date Mon, 18 Dec 2017 18:49:06 GMT
1) Based upon your question, it seems like you want to have one large job
that then spins off really small jobs. Any reason you can't just do it all
within one pipeline?

2. Am I capable of defining flexible window sizes per "device-task"?
Yes, you'll want a custom WindowFn which when going through AssignWindows
is able to introspect the element and timestamp and produce any window for
it. You'll want to think about what happens when an aggregation period
decreases/increases in size, how does that impact any currently ongoing

3) Unfortunately window assignment/merging doesn't allow access to side
inputs directly. To get around this you'll want to "join" the element with
the assign windows configuration for it before doing the AssignWindows and
then introspecting the configuration when you look at the element. Using a
cache like Redis is an alternative approach but requires hosting/setting it
up and configuring the refresh policy to discover key changes.

On Sun, Dec 17, 2017 at 12:56 PM, Theo Diefenthal <> wrote:

> Hi there,
> I'm currently evaluating Apache Beam as stream processing engine for my
> current project and hope you can help me out with some architectural
> questions.
> Let me fist describe what I'm willing to do:
> I have lots of IoT sensor data coming in from an Azure EventHub. For
> each message, I can uniquele identify the device it came from. For most
> of the devices, I have a processing "job-description" which always has a
> format like this: "If the event timestamp from that device is in
> [2017-01-01; 2017-12-31], slide that interval into 60 minute "buckets"
> and do an average aggregation on each bucket. When? At the end of the
> current bucket-window in processing time as well as for each
> (indefinetly) late arrival write the aggregation result out to my time
> series database". Note that due to my time series database, I don't need
> to have a state storage for very late arrivals but I am able to just do
> the (possibly expensive) late-arrival-re-aggregation out of a query to
> the time series database. Thus, I think of using Beam as kind of
> "trigger-only" mode for very late arrivals in order to don't keep too
> much state and use the state only for a watermark of a few windows
> (where 99,999% of the event data will falls into)
> My problem here is that not only the interval can change for each job
> definition, but also the aggregation period may change arbitrarily. I
> can have device-jobs with 13 min aggregation windows, with 17 min
> aggregations and basically with any user set aggregation period.
> My questions are the following:
> 1. Am I right that I should define one large job which takes in all
> events from the EventHub, groups by my device key and do a processing
> job for each device?
> 2. Am I capable of defining flexible window sizes per "device-task"?
> Currently, this looks not possible to me as the window size seems to be
> static in a job definiton?!
> 2.a) If not possible, am I able to workaround this issue, e.g. by
> defining a unique job per different aggregation period? Will I face
> major performance issues with that?
> 3. What would be the best practice to store / query my low frequently
> changing "device job definition"? I can have millions of jobs, but only
> a very few of them will change each day.  I plan to store those
> definitions into a SQL database. Should I make changes of them directly
> accessible to the pipeline via Side-Input? Or should I put them in sort
> of a Cache like Redis?
> I am looking forward to hearing from you.
> Best regards
> Theo

View raw message