From issues-return-193816-archive-asf-public=cust-asf.ponee.io@flink.apache.org Thu Oct 11 12:07:05 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 86F4218067A for ; Thu, 11 Oct 2018 12:07:04 +0200 (CEST) Received: (qmail 44728 invoked by uid 500); 11 Oct 2018 10:07:03 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 44719 invoked by uid 99); 11 Oct 2018 10:07:03 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 Oct 2018 10:07:03 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 36A5D183E14 for ; Thu, 11 Oct 2018 10:07:03 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -109.501 X-Spam-Level: X-Spam-Status: No, score=-109.501 tagged_above=-999 required=6.31 tests=[ENV_AND_HDR_SPF_MATCH=-0.5, KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_MED=-2.3, SPF_PASS=-0.001, USER_IN_DEF_SPF_WL=-7.5, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id BA-OGz-BdnJX for ; Thu, 11 Oct 2018 10:07:01 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 229B25F47D for ; Thu, 11 Oct 2018 10:07:01 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id A6034E0E3E for ; Thu, 11 Oct 2018 10:07:00 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 547BA247FD for ; Thu, 11 Oct 2018 10:07:00 +0000 (UTC) Date: Thu, 11 Oct 2018 10:07:00 +0000 (UTC) From: "Rinat Sharipov (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-9592) Notify on moving file into pending/ final state MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/FLINK-9592?page=3Dcom.atlassian= .jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=3D1664= 6230#comment-16646230 ]=20 Rinat Sharipov commented on FLINK-9592: --------------------------------------- Hi=C2=A0[~kkl0u] , during=C2=A0the process of migration to the latest Flink= , we've=C2=A0decided to try to contribute functionality that adds ability t= o hook the state changes in bucketing sink, so=C2=A0PR with this feature is= available. We are=C2=A0very interested in having those feature in BucketingSink and ar= e always open for discussion. Thx ! > Notify on moving file into pending/ final state > ----------------------------------------------- > > Key: FLINK-9592 > URL: https://issues.apache.org/jira/browse/FLINK-9592 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector > Reporter: Rinat Sharipov > Assignee: Kostas Kloudas > Priority: Major > Labels: pull-request-available > > Hi mates, I got a proposal about functionality of BucketingSink. > =C2=A0 > During implementation of one of our tasks we got the following need - cre= ate a meta-file, with the path and additional information about the file, c= reated by BucketingSink, when it=E2=80=99s been moved into final place. > Unfortunately such behaviour is currently not available for us.=C2=A0 > =C2=A0 > We=E2=80=99ve implemented our own Sink, that provides an opportunity to r= egister notifiers, that will be called, when file state is changing, but cu= rrent API doesn=E2=80=99t allow us to add such behaviour using inheritance = ... > =C2=A0 > It seems, that such functionality could be useful, and could be a part of= BucketingSink API > What do you sink, should I make a PR ? > Sincerely yours, > *Rinat Sharipov* > Software Engineer at 1DMP CORE Team > =C2=A0 > email:=C2=A0[r.sharipov@cleverdata.ru|mailto:a.totmakov@cleverdata.ru] > mobile: +7 (925) 416-37-26 > Clever{color:#4f8f00}DATA{color} > make your data clever > =C2=A0 > -------------------------------------------------------------------------= ----------------------------------------------- > =C2=A0 > Hi, > I see that could be a useful feature. What exactly now is preventing you = from inheriting from BucketingSink? Maybe it would be just enough to make t= he BucketingSink easier extendable. > One thing now that could collide with such feature is that Kostas is now = working on larger BucketingSink rework/refactor.=C2=A0 > Piotrek > ________________________________________________________________________ > =C2=A0 > Hi guys, thx for your reply.=C2=A0 > The following code info is actual for=C2=A0*release-1.5.0 tag,=C2=A0org.a= pache.flink.streaming.connectors.fs.bucketing.BucketingSink class* > =C2=A0 > For now, BucketingSink has the following lifecycle of files > =C2=A0 > When moving files from opened to pending state: > # on each item (*method*=C2=A0*invoke:434*=C2=A0*line*), we check that s= uitable bucket exist, and contain opened file, in case, when opened file do= esn=E2=80=99t exist, we create one, and write item to it > # on each item=C2=A0(*method*=C2=A0*invoke:434*=C2=A0*line*), we check t= hat suitable opened file doesn=E2=80=99t exceed the limits, and if limits a= re exceeded, we close it and move into pending state using=C2=A0*closeCurre= ntPartFile:568 line - private=C2=A0method* > # on each timer request (*onProcessingTime:482 line*), we check, if item= s haven't been added to the opened file longer, than specified period of ti= me, we close it, using the same private method=C2=A0*closeCurrentPartFile:5= 88 line* > =C2=A0 > So, the only way, that we have, is to call our hook from=C2=A0*closeCurre= ntPartFile*, that is private, so we copy-pasted the current impl and inject= ed our logic there > =C2=A0 > =C2=A0 > Files are moving from pending state into final, during checkpointing life= cycle, in=C2=A0*notifyCheckpointComplete:657 line*, that is public, and con= tains a lot of logic, including discovery of files in pending states, synch= ronization of state access and it=E2=80=99s modification, etc =E2=80=A6=C2= =A0 > =C2=A0 > So we couldn=E2=80=99t override it, or call super method and add some log= ic, because when current impl changes the state of files, it removes them f= rom state, and we don=E2=80=99t have any opportunity to know,=C2=A0 > for which files state have been changed. > =C2=A0 > To solve such problem, we've created the following interface > =C2=A0 > /** > * The \{@code FileStateChangeCallback}is used to perform any additional = operations, when > {@link BucketingSink} > * moves file from one state to another. For more information about state= management of \{@code BucketingSink}, look > * through it's official documentation. > */ > public interface FileStateChangeCallback extends Serializable \{ /** * Us= ed to perform any additional operations, related with moving of file into n= ext state. * * @param fs provides access for working with file system * @pa= ram path path to the file, moved into next state * * @throws IOException if= something went wrong, while performing any operations with file system */ = void call(FileSystem fs, Path path) throws IOException; } > And have added an ability to register this callbacks in BucketingSink imp= l in the following manner > =C2=A0 > public BucketingSink registerOnFinalStateChangeCallback(FileStateChang= eCallback=E2=80=A6 callbacks) \{...} > public BucketingSink registerOnPendingStateChangeCallback(FileStateCha= ngeCallback... callbacks) \{...} > =C2=A0 > I=E2=80=99m ready to discuss the best ways, how such hooks could be imple= mented in the core impl or any other improvements, that will help us to add= such functionality into our extension, using public api, instead of copy-p= asting the source code. > =C2=A0 > Thx for your help, mates =3D) > =C2=A0[*See More* from Piotr Nowojski|x-redundant-cluster-toggle://0] > =C2=A0 > Sincerely yours, > *Rinat Sharipov* > Software Engineer at 1DMP CORE Team > =C2=A0 > email:=C2=A0[r.sharipov@cleverdata.ru|mailto:a.totmakov@cleverdata.ru] > mobile: +7 (925) 416-37-26 > Clever{color:#4f8f00}DATA{color} > make your data clever > =C2=A0 > ________________________________________________________________________ > =C2=A0 > Hi, > =C2=A0 > Couple of things: > =C2=A0 > 1. Please create a Jira ticket with this proposal, so we can move discuss= ion from user mailing list. > =C2=A0 > I haven=E2=80=99t thought it through, so take my comments with a grain of= salt, however: > =C2=A0 > 2. If we were to go with such callback, I would prefer to have one Bucket= StateChangeCallback, with methods like `onInProgressToPending(=E2=80=A6)`, = `onPendingToFinal`, `onPendingToCancelled(=E2=80=A6)`, etc, in oppose to ha= ving one interface passed three times/four times for different purposes. > =C2=A0 > 3. Other thing that I had in mind is that BucketingSink could be rewritte= n to extend TwoPhaseCommitSinkFunction. In that case, with=C2=A0 > =C2=A0 > public class BucketingSink2 extends TwoPhaseCommitSinkFunction > =C2=A0 > user could add his own hooks by overriding following methods > =C2=A0 > BucketingSink2#beginTransaction, BucketingSink2#preCommit,=C2=A0Bucketing= Sink2#commit,=C2=A0BucketingSink2#abort. For example: > =C2=A0 > public class MyBucketingSink extends BucketingSink2 { > =C2=A0 @Override > =C2=A0 protected void =C2=A0commit(??? txn) > { =C2=A0 =C2=A0 super.commit(txn); =C2=A0 =C2=A0 // My hook on moving fil= e from pending to commit state =C2=A0 } > ; > } > =C2=A0 > Alternatively, we could implement before mentioned callbacks support in T= woPhaseCommitSinkFunction and provide such feature to Kafka/Pravega/Bucketi= ngSink at once. > =C2=A0 > Piotrek > =C2=A0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)