flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timothy Victor <vict...@gmail.com>
Subject Re: Processing post to sink?
Date Sat, 14 Dec 2019 15:27:31 GMT
Why not implement your own SinkFunction, or maybe inherit from the one you
are using now?

Tim

On Sat, Dec 14, 2019, 8:52 AM Theo Diefenthal <
theo.diefenthal@scoop-software.de> wrote:

> Hi there,
>
> In my pipeline, I write data into a partitioned parquet directory via
> StreamingFileSink and a partitioner like:
>
> @Override
> public String getBucketId(V element, Context context) {
>     return "partitionkey=" + element.getPartitionkey();
> }
>
> That works well so far.
>
> Now I need to know when all sink instances are fully "done" with a
> partition in order to trigger some export jobs (for machine learning/model
> training) and also notify Impala about the new (final) partition.
>
> In my case, I can well define "fully done". The partitionkey is directly
> deduced from event time and my watermarks guarantee no late arrivals. So
> once a watermark passes a certain event time, I know that the prior
> partition is completed and can trigger my stuff. Well not directly: Once
> the watermark passes, I need to wait for the next checkpoint to be
> completed because only then, the parquet files are committed and the
> partition is fully written to.
>
> The question is: How do I implement my "partition-completed"-condition
> check in Flink? It pretty much comes down to that I want to do some
> processing _after_ a Sink based on the sinks progress.
> (Watermark+checkpoints)
>
> The only idea I got up with so far is: Make the sink a process-function
> which also emits elements. Only on a completed checkpoint, emit an element
> with the current watermark downstream. In the next step, assign event
> timestamps based on these events and merge the parallel subtasks into one,
> thus keeping track of the global watermark. In the task with parallelism 1,
> I could then issue my impala queries and export jobs. (Which should not be
> called by multiple parallel instances simultaneously).
>
> Do you have any better ideas for implementation or is this the best way to
> go? I thought about just building a custom sink inheriting from
> StreamingFileSink, but I don't know how to trigger my jobs then only once .
> I _could_ check for my sink parallel subtask index to be something like 0
> and only in that case trigger the subtasks, but I have heavy skew in my
> parallel instances: Some process millions of elements, whereas other
> process just 10 events a day. If my "notification-sink-subtask" would end
> up on a partition with those few events, I would get way too seldom new
> triggers. And I further wouldn't know if the other instances also had
> already committed there parquet files.
>
> What kind of problems do I need to expect when making a sink a
> process-function?
>
> Best regards
> Theo
>

Mime
View raw message