flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Aljoscha Krettek (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4190) Generalise RollingSink to work with arbitrary buckets
Date Wed, 13 Jul 2016 08:08:20 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15374568#comment-15374568

Aljoscha Krettek commented on FLINK-4190:

What you could also do is create a new package and put the new sink and related classes in
there. This way you wouldn't have to rename {{Bucketed}} to {{BucketFunction}} and stuff would
be nicely "isolated".

> Generalise RollingSink to work with arbitrary buckets
> -----------------------------------------------------
>                 Key: FLINK-4190
>                 URL: https://issues.apache.org/jira/browse/FLINK-4190
>             Project: Flink
>          Issue Type: Improvement
>          Components: filesystem-connector, Streaming Connectors
>            Reporter: Josh Forman-Gornall
>            Assignee: Josh Forman-Gornall
>            Priority: Minor
> The current RollingSink implementation appears to be intended for writing to directories
that are bucketed by system time (e.g. minutely) and to only be writing to one file within
one bucket at any point in time. When the system time determines that the current bucket should
be changed, the current bucket and file are closed and a new bucket and file are created.
The sink cannot be used for the more general problem of writing to arbitrary buckets, perhaps
determined by an attribute on the element/tuple being processed.
> There are three limitations which prevent the existing sink from being used for more
general problems:
> - Only bucketing by the current system time is supported, and not by e.g. an attribute
of the element being processed by the sink.
> - Whenever the sink sees a change in the bucket being written to, it flushes the file
and moves on to the new bucket. Therefore the sink cannot have more than one bucket/file open
at a time. Additionally the checkpointing mechanics only support saving the state of one active
bucket and file.
> - The sink determines that it should 'close' an active bucket and file when the bucket
path changes. We need another way to determine when a bucket has become inactive and needs
to be closed.

This message was sent by Atlassian JIRA

View raw message