flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jiawei Wu <wujiawei5837...@gmail.com>
Subject Re: Use flink to calculate sum of the inventory under certain conditions
Date Wed, 11 Mar 2020 02:06:27 GMT
Hi Robert,

Your answer really helps.

About the problem, we have 2 choices. The first one is using Flink as
described in this email thread. The second one is using AWS Lambda
triggered by CDC stream and compute the latest 15 days record, which is a
walk-around solution and looks not as elegant as Flink to me.

Currently we decided to choose AWS Lambda because we are familiar with it,
and the most important, it lead to nearly no operational burden. But we are
actively looking for the comparison between Lambda and Flink and want to
know in which situation we prefer Flink over Lambda. Several teams in our
company are already in a hot debate about the comparison, and the biggest
concern is the non-function requirements about Flink, such as fault
tolerance, recovery, etc.

I also searched the internet but found there are nearly no comparisons
between Lambda and Flink except for their market share :-( I'm wondering
what do you think of this? Or any comments from flink community is


On Mon, Mar 9, 2020 at 8:22 PM Robert Metzger <rmetzger@apache.org> wrote:

> Hey Jiawei,
> I'm sorry that you haven't received an answer yet.
> So you basically have a stream of dynamodb table updates (let's call id
> CDC stream), and you would like to maintain the inventory of the last 15
> days for each vendor.
> Whenever there's an update in the inventory data (a new event arrives in
> the CDC stream), you want to produce a new event with the inventory count.
> If I'm not mistaken, you will need to keep all the inventory in Flink's
> state to have an accurate count and to drop old records when they are
> expired.
> There are two options for maintaining the state:
> - in memory (using the FsStateBackend)
> - on disk (using the embedded RocksDBStatebackend)
> I would recommend starting with the RocksDBStateBackend. It will work as
> long as your state fits on all your machines hard disks (we'll probably not
> have an issue there :) )
> If you run into performance issues, you can consider switching to a memory
> based backend (by then, you should have some knowledge about your state
> size)
> For tracking the events, I would recommend you to look into Flink's
> windowing API:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html
>  / https://flink.apache.org/news/2015/12/04/Introducing-windows.html
> Or alternatively doing an implementation with ProcessFunction:
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html
> I personally would give it a try with ProcessFunction first.
> For reading the data from DynamoDB, there's an undocumented feature for it
> in Flink. This is an example for reading from a DynamoDB stream:
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromDynamoDBStreams.java
> Here's also some info: https://issues.apache.org/jira/browse/FLINK-4582
> For writing to DynamoDB there is currently no official sink in Flink. It
> should be fairly straightforward to implement a Sink using the SinkFunction
> interface of Flink.
> I hope this answers your question.
> Best,
> Robert
> On Tue, Mar 3, 2020 at 3:25 AM Jiawei Wu <wujiawei5837464@gmail.com>
> wrote:
>> Hi flink users,
>> We have a problem and think flink may be a good solution for that. But
>> I'm new to flink and hope can get some insights from flink community :)
>> Here is the problem. Suppose we have a DynamoDB table which store the
>> inventory data, the schema is like:
>> * vendorId (primary key)
>> * inventory name
>> * inventory units
>> * inbound time
>> ...
>> This DDB table keeps changing, since we have inventory coming and
>> removal. *Every change will trigger a DynamoDB stream. *
>> We need to calculate *all the inventory units that > 15 days for a
>> specific vendor* like this:
>> > select vendorId, sum(inventory units)
>> > from dynamodb
>> > where today's time - inbound time > 15
>> > group by vendorId
>> We don't want to schedule a daily batch job, so we are trying to work on
>> a micro-batch solution in Flink, and publish this data to another DynamoDB
>> table.
>> A draft idea is to use the total units minus <15 days units, since both
>> of then have event trigger. But no detailed solutions yet.
>> Could anyone help provide some insights here?
>> Thanks,
>> J.

View raw message