Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id F2433200CF3 for ; Wed, 13 Sep 2017 12:04:34 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id EF2B21609CA; Wed, 13 Sep 2017 10:04:34 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 70BC01609C9 for ; Wed, 13 Sep 2017 12:04:33 +0200 (CEST) Received: (qmail 56874 invoked by uid 500); 13 Sep 2017 10:04:32 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 56864 invoked by uid 99); 13 Sep 2017 10:04:32 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 13 Sep 2017 10:04:32 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id B0FFBD45DE for ; Wed, 13 Sep 2017 10:04:31 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.294 X-Spam-Level: *** X-Spam-Status: No, score=3.294 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, URIBL_BLOCKED=0.001, URI_HEX=1.313] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=okkam-it.20150623.gappssmtp.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id Xm_3pcrxVWjo for ; Wed, 13 Sep 2017 10:04:29 +0000 (UTC) Received: from mail-ua0-f177.google.com (mail-ua0-f177.google.com [209.85.217.177]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 2D7445FB4E for ; Wed, 13 Sep 2017 10:04:28 +0000 (UTC) Received: by mail-ua0-f177.google.com with SMTP id g47so18312917uad.0 for ; Wed, 13 Sep 2017 03:04:28 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=okkam-it.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc; bh=4NSMBU/dyP4dLzXA5ycJjFrEdT0kFiFBDHgY+M+6/h0=; b=ZPH3DuvicNXsOPKLPcmNzQ6njAs557EOjNVWbH6yN7SFciLH3LQcgjAlfFhj2/H7I5 JNnkYIQubFk5az3LLIhrjxS11Gmqp75N2S5dN7ohbag9AIvHBqWo9aRgcWcq5Flv3U5e c57VTdmj8HWYEvzOSIsETpIhHPv53OX0NmqQHO+Uie3YeSigkPlF82LZHO5s46Av7NkC n2kdWFo2MfiQx5o+Bhxh206hrvv6qh+AtWHvCDW1SajEo8J6mssQVHbM3Au4wCuA0XF1 G71f/3xZkGZpy//IROx1hlGStiOGu+NXXfB7ywC+Ik6+zIZDsOYewlbNRQvBf3dj8IOm djDw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc; bh=4NSMBU/dyP4dLzXA5ycJjFrEdT0kFiFBDHgY+M+6/h0=; b=Ha7CvSC6tvsD0nRFLZtfVuR/aDzcjMaQ+Au8Tf+SSRB64B3tf9qQpNAAO4yU6DJ2zP WfodTGq4AUlFXWuSA5OJ8SLFDHEUibZp+ISm6grwAHchl3wr7ulv5m+nk8rYTQA0lN6j UkJkxAqLi6hjcUrKvOcOFBS0DmjQTExU7VJEIK+4YzxH9WCjuyRbjNPdS8T+/VG7K/6t oGJzNG8DTL16tjzmyGJSgVjmPsttXX+I8uj11dyed/4gyzh1UOT10Z5wXsIzjjbkhLTm /6x0/UfHBemTsG5TG0gDRomfEaEYzOL7U6bRi4+SUe88QqNVL40+2ZfIN/dwXyrXUd83 eclg== X-Gm-Message-State: AHPjjUj5RcgTB9Bhqnx6OkPxRTcQgVrQMtqD6z+VN+mmFWCr7HWFBkS8 8jzRAUKmwKazoJntcPEFaeTUG2ZEZBa67WBHp7W3JQ== X-Google-Smtp-Source: AOwi7QCMZGXEt/wCs8N/mOWZikf5dnuCNi1ktPIW5441y2gVLva/YsbSopYefUvPI23lr+mX8NiTS+m+nrlyH01EXI0= X-Received: by 10.176.1.66 with SMTP id 60mr12852013uak.48.1505297066785; Wed, 13 Sep 2017 03:04:26 -0700 (PDT) MIME-Version: 1.0 Received: by 10.159.33.103 with HTTP; Wed, 13 Sep 2017 03:04:06 -0700 (PDT) X-Originating-IP: [77.43.114.114] In-Reply-To: References: <87FCF441-EE2B-4EEA-9826-A1D1252AA6F9@data-artisans.com> From: Flavio Pompermaier Date: Wed, 13 Sep 2017 12:04:06 +0200 Message-ID: Subject: Re: BucketingSink never closed To: "Tzu-Li (Gordon) Tai" Cc: Aljoscha Krettek , Kostas Kloudas , user Content-Type: multipart/alternative; boundary="001a113cfd2cf28f3605590f489e" archived-at: Wed, 13 Sep 2017 10:04:35 -0000 --001a113cfd2cf28f3605590f489e Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable Hi Gordon, thanks for your feedback. The main problem for me is that moving from batch to stream should be much easier IMHO. Rows should be a first class citizen in Flink and should be VERY easy to read/write them, while at the moment it seems that Tuples are the dominating type...I don't want to write a serializer/outputFormat to persist Rows as Parquet, Avro, Thrift, OCR, Kudu, Hive, etc..I expect to have some already existing (and mantained) connector already available somewhere. The case of the Parquet Rollink sink is just an example. Regarding state backends I think that its not so easy to understand how to design and monitor it properly: there are many parameters/variables to take into account and it would be helpful to have a proper hands-on training course/certification about this... About ES indexing monitoring see my discussion with Chesnay at http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Streami= ng-job-monitoring-td13583.html: what I need is just to have recordsIn/recordsOut reflecting real values. Best, Flavio On Wed, Sep 13, 2017 at 10:56 AM, Tzu-Li (Gordon) Tai wrote: > Hi Flavio, > > Let me try to understand / look at some of the problems you have > encountered. > > > - checkpointing: it's not clear which checkpointing system to use and > how to tune/monitor it and avoid OOM exceptions. > > What do you mean be which "checkpointing system=E2=80=9D to use? Do you m= ean state > backends? Typically, you would only get OOM exceptions for memory-backed > state backends if the state size exceeds the memory capacity. State sizes > can be queried from the REST APIs / Web UI. > > > - cleanup: BucketingSink doesn't always move to final state > > This sounds like a bug that we should look into. Do you have any logs on > which you observed this? > > > - missing output formats: parquet support to write generic Rows not > very well supported (at least out of the box) [1] > > Would you be up to opening up JIRAs for what you think is missing (if > there isn=E2=80=99t one already)? > > > - progress monitoring: for example in the ES connector there's no way > (apart from using accumulators) to monitor the progress of the indexin= g > > Maybe we can add some built-in metric in the ES sink connector that track= s > the number of successfully indexed elements, which can then be queried fr= om > the REST API / Web UI. That wouldn=E2=80=99t be too much effort. What do = you think, > would that be useful for your case? > Would be happy to hear your thoughts on this! > > Cheers, > Gordon > > > On 12 September 2017 at 11:36:27 AM, Flavio Pompermaier ( > pompermaier@okkam.it) wrote: > > For the moment I give up with streaming...too many missing/unclear > features wrt batch. > For example: > > - checkpointing: it's not clear which checkpointing system to use and > how to tune/monitor it and avoid OOM exceptions. Moreover is it really > necessary to use it? For example if I read a file from HDFS and I don'= t > have a checkpoint it could be ok to re-run the job on all the data in = case > of errors (i.e. the stream is managed like a batch) > - cleanup: BucketingSink doesn't always move to final state > - missing output formats: parquet support to write generic Rows not > very well supported (at least out of the box) [1] > - progress monitoring: for example in the ES connector there's no way > (apart from using accumulators) to monitor the progress of the indexin= g > > [1] https://stackoverflow.com/questions/41144659/flink-avro- > parquet-writer-in-rollingsink > > Maybe I'm wrong with those points but the attempt to replace my current > batch system with a streaming one had no luck with those points. > > Best, > Flavio > > On Fri, Sep 8, 2017 at 5:29 PM, Aljoscha Krettek > wrote: > >> Hi, >> >> Expanding a bit on Kostas' answer. Yes, your analysis is correct, the >> problem is that the job is shutting down before a last checkpoint can >> "confirm" the written bucket data by moving it to the final state. The >> problem, as Kostas noted is that a user function (and thus also >> BucketingSink) does not know whether close() is being called because of = a >> failure or because normal job shutdown. Therefore, we cannot move data t= o >> the final stage there. >> >> Once we have the issue that Kostas posted resolve we can also resolve >> this problem for the BucketingSink. >> >> Best, >> Aljoscha >> >> On 8. Sep 2017, at 16:48, Kostas Kloudas >> wrote: >> >> Hi Flavio, >> >> If I understand correctly, I think you bumped into this issue: >> https://issues.apache.org/jira/browse/FLINK-2646 >> >> There is also a similar discussion on the BucketingSink here: >> http://apache-flink-mailing-list-archive.1008284.n3.nabble. >> com/DISCUSS-Adding-a-dispose-method-in-the-RichFunction- >> td14466.html#a14468 >> >> Kostas >> >> On Sep 8, 2017, at 4:27 PM, Flavio Pompermaier >> wrote: >> >> Hi to all, >> I'm trying to test a streaming job but the files written by >> the BucketingSink are never finalized (remains into the pending state). >> Is this caused by the fact that the job finishes before the checkpoint? >> Shouldn't the sink properly close anyway? >> >> This is my code: >> >> @Test >> public void testBucketingSink() throws Exception { >> final StreamExecutionEnvironment senv =3D StreamExecutionEnvironment= .get >> ExecutionEnvironment(); >> final StreamTableEnvironment tEnv =3D TableEnvironment.getTableEnvir >> onment(senv); >> senv.enableCheckpointing(5000); >> DataStream testStream =3D senv.fromElements(// >> "1,aaa,white", // >> "2,bbb,gray", // >> "3,ccc,white", // >> "4,bbb,gray", // >> "5,bbb,gray" // >> ); >> final RowTypeInfo rtf =3D new RowTypeInfo( >> BasicTypeInfo.STRING_TYPE_INFO, >> BasicTypeInfo.STRING_TYPE_INFO, >> BasicTypeInfo.STRING_TYPE_INFO); >> DataStream rows =3D testStream.map(new MapFunction= () { >> >> private static final long serialVersionUID =3D 1L; >> >> @Override >> public Row map(String str) throws Exception { >> String[] split =3D str.split(Pattern.quote(",")); >> Row ret =3D new Row(3); >> ret.setField(0, split[0]); >> ret.setField(1, split[1]); >> ret.setField(2, split[2]); >> return ret; >> } >> }).returns(rtf); >> >> String columnNames =3D "id,value,state"; >> final String dsName =3D "test"; >> tEnv.registerDataStream(dsName, rows, columnNames); >> final String whiteAreaFilter =3D "state =3D 'white'"; >> DataStream grayArea =3D rows; >> DataStream whiteArea =3D null; >> if (whiteAreaFilter !=3D null) { >> String sql =3D "SELECT *, (%s) as _WHITE FROM %s"; >> sql =3D String.format(sql, whiteAreaFilter, dsName); >> Table table =3D tEnv.sql(sql); >> grayArea =3D tEnv.toDataStream(table.where( >> "!_WHITE").select(columnNames), rtf); >> DataStream nw =3D tEnv.toDataStream(table.where("_WHITE").sel= ect(columnNames), >> rtf); >> whiteArea =3D whiteArea =3D=3D null ? nw : whiteArea.union(nw); >> } >> Writer bucketSinkwriter =3D new RowCsvWriter("UTF-8", "\t", "\n= "); >> >> String datasetWhiteDir =3D "/tmp/bucket/white"; >> BucketingSink whiteAreaSink =3D new BucketingSink<>(datasetWhit= eDi >> r.toString()); >> whiteAreaSink.setWriter(bucketSinkwriter); >> whiteAreaSink.setBatchSize(10); >> whiteArea.addSink(whiteAreaSink); >> >> String datasetGrayDir =3D "/tmp/bucket/gray"; >> BucketingSink grayAreaSink =3D new BucketingSink<>(datasetGrayD= ir >> .toString()); >> grayAreaSink.setWriter(bucketSinkwriter); >> grayAreaSink.setBatchSize(10); >> grayArea.addSink(grayAreaSink); >> >> JobExecutionResult jobInfo =3D senv.execute("Buketing sink test "); >> System.out.printf("Job took %s minutes", >> jobInfo.getNetRuntime(TimeUnit.MINUTES)); >> } >> >> >> >> >> >> >> >> public class RowCsvWriter extends StreamWriterBase { >> private static final long serialVersionUID =3D 1L; >> >> private final String charsetName; >> private transient Charset charset; >> private String fieldDelimiter; >> private String recordDelimiter; >> private boolean allowNullValues =3D true; >> private boolean quoteStrings =3D false; >> >> /** >> * Creates a new {@code StringWriter} that uses {@code "UTF-8"} charse= t >> to convert strings to >> * bytes. >> */ >> public RowCsvWriter() { >> this("UTF-8", CsvOutputFormat.DEFAULT_FIELD_DELIMITER, >> CsvOutputFormat.DEFAULT_LINE_DELIMITER); >> } >> >> /** >> * Creates a new {@code StringWriter} that uses the given charset to >> convert strings to bytes. >> * >> * @param charsetName Name of the charset to be used, must be valid >> input for >> * {@code Charset.forName(charsetName)} >> */ >> public RowCsvWriter(String charsetName, String fieldDelimiter, String >> recordDelimiter) { >> this.charsetName =3D charsetName; >> this.fieldDelimiter =3D fieldDelimiter; >> this.recordDelimiter =3D recordDelimiter; >> } >> >> @Override >> public void open(FileSystem fs, Path path) throws IOException { >> super.open(fs, path); >> >> try { >> this.charset =3D Charset.forName(charsetName); >> } catch (IllegalCharsetNameException ex) { >> throw new IOException("The charset " + charsetName + " is not >> valid.", ex); >> } catch (UnsupportedCharsetException ex) { >> throw new IOException("The charset " + charsetName + " is not >> supported.", ex); >> } >> } >> >> @Override >> public void write(Row element) throws IOException { >> FSDataOutputStream outputStream =3D getStream(); >> writeRow(element, outputStream); >> } >> >> private void writeRow(Row element, FSDataOutputStream out) throws >> IOException { >> int numFields =3D element.getArity(); >> >> for (int i =3D 0; i < numFields; i++) { >> Object obj =3D element.getField(i); >> if (obj !=3D null) { >> if (i !=3D 0) { >> out.write(this.fieldDelimiter.getBytes(charset)); >> } >> >> if (quoteStrings) { >> if (obj instanceof String || obj instanceof StringValue) { >> out.write('"'); >> out.write(obj.toString().getBytes(charset)); >> out.write('"'); >> } else { >> out.write(obj.toString().getBytes(charset)); >> } >> } else { >> out.write(obj.toString().getBytes(charset)); >> } >> } else { >> if (this.allowNullValues) { >> if (i !=3D 0) { >> out.write(this.fieldDelimiter.getBytes(charset)); >> } >> } else { >> throw new RuntimeException("Cannot write tuple with >> value at position: " + i); >> } >> } >> } >> >> // add the record delimiter >> out.write(this.recordDelimiter.getBytes(charset)); >> } >> >> @Override >> public Writer duplicate() { >> return new RowCsvWriter(charsetName, fieldDelimiter, recordDelimiter= ); >> } >> } >> >> >> >> Any help is appreciated, >> Flavio >> >> >> >> > --001a113cfd2cf28f3605590f489e Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Hi Gordon,=C2=A0
thanks for your feedback. The main pr= oblem for me is that moving from batch to stream should be much easier IMHO= .

Rows should be a first class citizen in Flink an= d should be VERY easy to read/write them, while at the moment it seems that= Tuples are the dominating type...I don't want to write a serializer/ou= tputFormat to persist Rows as Parquet, Avro, Thrift, OCR, Kudu, Hive, etc..= I expect to have some already existing (and mantained) connector already av= ailable somewhere. The case of the Parquet Rollink sink is just an example.=

Regarding state backends I think that its not so = easy to understand how to design and monitor it properly: there are many pa= rameters/variables to take into account and it would be helpful to have a p= roper hands-on=C2=A0training course/certification about this...
<= br>
About ES indexing monitoring see my discussion with=C2=A0Ches= nay=C2=A0at=C2=A0http://apache-f= link-user-mailing-list-archive.2336050.n4.nabble.com/Streaming-job-monitori= ng-td13583.html: what I need is just to have recordsIn/recordsOut refle= cting real values.

Best,
Flavio

On Wed, Sep 13, 2017= at 10:56 AM, Tzu-Li (Gordon) Tai <tzulitai@apache.org> wr= ote:
=
Hi Flavio,

Let me try to understand / look at some= of the problems you have encountered.
  • checkpointing: it's not clear which checkpointing system to = use and how to tune/monitor it and avoid OOM exceptions.
What do you mean be which "checkpointing s= ystem=E2=80=9D to use? Do you mean state backends? Typically, you would onl= y get OOM exceptions for memory-backed state backends if the state size exc= eeds the memory capacity. State sizes can be queried from the REST APIs / W= eb UI.
  • cleanup: BucketingS= ink doesn't always move to final state

This sounds like a bug that we should look into. Do you have an= y logs on which you observed this?

<= ul>
  • missing output formats: parquet support to write generic Rows not ve= ry well supported (at least out of the box) [1]
  • Would you be up to opening up JIRAs for what you think is = missing (if there isn=E2=80=99t one already)?

    • progress monitoring: for example in the ES connector ther= e's no way (apart from using accumulators) to monitor the progress of t= he indexing

    Maybe we can add so= me built-in metric in the ES sink connector that tracks the number of succe= ssfully indexed elements, which can then be queried from the REST API / Web= UI. That wouldn=E2=80=99t be too much effort. What do you think, would tha= t be useful for your case?

    Would be happy to hear your thoughts on = this!

    Cheers,
    Gordon


    On 12 September 2017 at = 11:36:27 AM, Flavio Pompermaier (pompermaier@okkam.it) wrote:

    For the moment I give up with streaming...too many missing/unclear features wrt batch.=C2=A0
    For example:
    • checkpointing: it's not clear which checkpointing system to use and how to tune/monitor it and avoid OOM exceptions. Moreover is it really necessary to use it? For example if I read a file from HDFS and I don't have a checkpoint it could be ok to re-run the job on all the data in case of errors (i.e. the stream is managed like a batch)
    • cleanup: BucketingSink doesn't always move to final state
    • missing output formats: parquet support to write generic Rows not very well supported (at least out of the box) [1]
    • progress monitoring: for example in the ES connector there's no way (apart from using accumulators) to monitor the progress of the indexing

    Maybe I'm wrong with those points but the attempt to replace my current batch system with a streaming one had no luck with those points.

    Best,
    Flavio

    On Fri, Sep 8, 2017 at 5:29 PM, Aljoscha Krettek <aljoscha@apache.org> wrote:
    Hi,

    Expanding a bit on Kostas' answer. Yes, your analysis is correct, the problem is that the job is shutting down before a last checkpoint can "confirm" the written bucket data by moving it to the final state. The problem, as Kostas noted is that a user function (and thus also BucketingSink) does not know whether close() is being called because of a failure or because normal job shutdown. Therefore, we cannot move data to the final stage there.

    Once we have the issue that Kostas posted resolve we can also resolve this problem for the BucketingSink.

    Best,
    Aljoscha

    On 8. Sep 2017, at 16:48, Kostas Kloudas <k.kloudas@data-artisans.com>= wrote:

    Hi Flavio,

    If I understand correctly, I think you bumped into this issue:=C2=A0https://issues.apache.org/jira/browse/FLINK-2646

    There is also a similar discussion on the BucketingSink here:=C2=A0

    Kostas

    On Sep 8, 2017, at 4:27 PM, Flavio Pompermaier <pompermaier@okkam.it> wrote:<= /div>
    Hi to all,
    I'm trying to test a streaming job but the files written by the=C2=A0BucketingSink are never finalized (remains into the pending state).
    Is this caused by the fact that the job finishes before the checkpoint?
    Shouldn't the sink properly close anyway?

    This is my code:

    =C2=A0 @Test
    =C2=A0 public void testBucketingSink() throws Exception {
    =C2=A0 =C2=A0 final StreamExecutionEnvironment senv =3D StreamExecutionEnvironment.getExecutionEnvironment();
    =C2=A0 =C2=A0 final StreamTableEnvironment tEnv =3D TableEnvironment.getTableEnvironment(senv);
    =C2=A0 =C2=A0 senv.enableCheckpointing(5000);
    =C2=A0 =C2=A0 DataStream<String> testStream =3D senv.fromElements(//
    =C2=A0 =C2=A0 =C2=A0 =C2=A0 "1,aaa,white", //
    =C2=A0 =C2=A0 =C2=A0 =C2=A0 "2,bbb,gray", //
    =C2=A0 =C2=A0 =C2=A0 =C2=A0 "3,ccc,white", //
    =C2=A0 =C2=A0 =C2=A0 =C2=A0 "4,bbb,gray", //
    =C2=A0 =C2=A0 =C2=A0 =C2=A0 "5,bbb,gray" //
    =C2=A0 =C2=A0 );
    =C2=A0 =C2=A0 final RowTypeInfo rtf =3D new RowTypeInfo(
    =C2=A0 =C2=A0 =C2=A0 =C2=A0 BasicTypeInfo.STRING_TYPE_INFO,
    =C2=A0 =C2=A0 =C2=A0 =C2=A0 BasicTypeInfo.STRING_TYPE_INFO,=C2=A0
    =C2=A0 =C2=A0 =C2=A0 =C2=A0 BasicTypeInfo.STRING_TYPE_INFO);
    =C2=A0 =C2=A0 DataStream<Row> rows =3D testStream.map(new MapFunction<String, Row>() {

    =C2=A0 =C2=A0 =C2=A0 private static final long serialVersionUID =3D 1L;

    =C2=A0 =C2=A0 =C2=A0 @Override
    =C2=A0 =C2=A0 =C2=A0 public Row map(String str) throws Exception {
    =C2=A0 =C2=A0 =C2=A0 =C2=A0 String[] split =3D str.split(Pattern.quote(","));
    =C2=A0 =C2=A0 =C2=A0 =C2=A0 Row ret =3D new Row(3);
    =C2=A0 =C2=A0 =C2=A0 =C2=A0 ret.setField(0, split[0]);
    =C2=A0 =C2=A0 =C2=A0 =C2=A0 ret.setField(1, split[1]);
    =C2=A0 =C2=A0 =C2=A0 =C2=A0 ret.setField(2, split[2]);
    =C2=A0 =C2=A0 =C2=A0 =C2=A0 return ret;
    =C2=A0 =C2=A0 =C2=A0 }
    =C2=A0 =C2=A0 }).returns(rtf);

    =C2=A0 =C2=A0 String columnNames =3D "id,value,state";
    =C2=A0 =C2=A0 final String dsName =3D "test";
    =C2=A0 =C2=A0 tEnv.registerDataStream(dsName, rows, columnNames);
    =C2=A0 =C2=A0 final String whiteAreaFilter =3D "state =3D 'white'";
    =C2=A0 =C2=A0 DataStream<Row> grayArea =3D rows;
    =C2=A0 =C2=A0 DataStream<Row> whiteArea =3D null;
    =C2=A0 =C2=A0 if (whiteAreaFilter !=3D null) {
    =C2=A0 =C2=A0 =C2=A0 String sql =3D "SELECT *, (%s) as _WHITE FROM %s";
    =C2=A0 =C2=A0 =C2=A0 sql =3D String.format(sql, whiteAreaFilter, dsName);
    =C2=A0 =C2=A0 =C2=A0 Table table =3D tEnv.sql(sql);
    =C2=A0 =C2=A0 =C2=A0 grayArea =3D tEnv.toDataStream(table.where("!_WHITE").select(columnNames)= , rtf);
    =C2=A0 =C2=A0 =C2=A0 DataStream<Row> nw =3D tEnv.toDataStream(table.where("_WHITE").select(columnNames), rtf);
    =C2=A0 =C2=A0 =C2=A0 whiteArea =3D whiteArea =3D=3D null ? nw : whiteArea.union(nw);
    =C2=A0 =C2=A0 }
    =C2=A0 =C2=A0 Writer<Row> bucketSinkwriter =3D new RowCsvWriter("UTF-8", "\t", "\n");

    =C2=A0 =C2=A0 String datasetWhiteDir =3D "/tmp/bucket/white";
    =C2=A0 =C2=A0 BucketingSink<Row> whiteAreaSink =3D new BucketingSink<>(datasetWhiteDir.toString());
    =C2=A0 =C2=A0 whiteAreaSink.setWriter(bucketSinkwriter);
    =C2=A0 =C2=A0 whiteAreaSink.setBatchSize(10);
    =C2=A0 =C2=A0 whiteArea.addSink(whiteAreaSink);

    =C2=A0 =C2=A0 String datasetGrayDir =3D "/tmp/bucket/gray";
    =C2=A0 =C2=A0 BucketingSink<Row> grayAreaSink =3D new BucketingSink<>(datasetGrayDir.toString());
    =C2=A0 =C2=A0 grayAreaSink.setWriter(bucketSinkwriter);
    =C2=A0 =C2=A0 grayAreaSink.setBatchSize(10);
    =C2=A0 =C2=A0 grayArea.addSink(grayAreaSink);

    =C2=A0 =C2=A0 JobExecutionResult jobInfo =3D senv.execute("Buketing sink test ");
    =C2=A0 =C2=A0 System.out.printf("Job took %s minutes", jobInfo.getNetRuntime(TimeUnit.MINUTES));
    =C2=A0 }







    public class RowCsvWriter extends StreamWriterBase<Row> {
    =C2=A0 private static final long serialVersionUID =3D 1L;

    =C2=A0 private final String charsetName;
    =C2=A0 private transient Charset charset;
    =C2=A0 private String fieldDelimiter;
    =C2=A0 private String recordDelimiter;
    =C2=A0 private boolean allowNullValues =3D true;
    =C2=A0 private boolean quoteStrings =3D false;

    =C2=A0 /**
    =C2=A0 =C2=A0* Creates a new {@code StringWriter} that uses {@code "UTF-8"} charset to convert strings to
    =C2=A0 =C2=A0* bytes.
    =C2=A0 =C2=A0*/
    =C2=A0 public RowCsvWriter() {
    =C2=A0 =C2=A0 this("UTF-8", CsvOutputFormat.DEFAULT_FIELD_DELIMITER, CsvOutputFormat.DEFAULT_LINE_DELIMITER);
    =C2=A0 }

    =C2=A0 /**
    =C2=A0 =C2=A0* Creates a new {@code StringWriter} that uses the given charset to convert strings to bytes.
    =C2=A0 =C2=A0*
    =C2=A0 =C2=A0* @param charsetName Name of the charset to be used, must be valid input for
    =C2=A0 =C2=A0* =C2=A0 =C2=A0 =C2=A0 =C2=A0{@code Charset.forName(charsetName)}
    =C2=A0 =C2=A0*/
    =C2=A0 public RowCsvWriter(String charsetName, String fieldDelimiter, String recordDelimiter) {
    =C2=A0 =C2=A0 this.charsetName =3D charsetName;
    =C2=A0 =C2=A0 this.fieldDelimiter =3D fieldDelimiter;
    =C2=A0 =C2=A0 this.recordDelimiter =3D recordDelimiter;
    =C2=A0 }

    =C2=A0 @Override
    =C2=A0 public void open(FileSystem fs, Path path) throws IOException {
    =C2=A0 =C2=A0 super.open(fs, path);

    =C2=A0 =C2=A0 try {
    =C2=A0 =C2=A0 =C2=A0 this.charset =3D Charset.forName(charsetName);
    =C2=A0 =C2=A0 } catch (IllegalCharsetNameException ex) {
    =C2=A0 =C2=A0 =C2=A0 throw new IOException("The charset " + charsetName + " is not valid.", ex);
    =C2=A0 =C2=A0 } catch (UnsupportedCharsetException ex) {
    =C2=A0 =C2=A0 =C2=A0 throw new IOException("The charset " + charsetName + " is not supported.", ex);
    =C2=A0 =C2=A0 }
    =C2=A0 }

    =C2=A0 @Override
    =C2=A0 public void write(Row element) throws IOException {
    =C2=A0 =C2=A0 FSDataOutputStream outputStream =3D getStream();
    =C2=A0 =C2=A0 writeRow(element, outputStream);
    =C2=A0 }

    =C2=A0 private void writeRow(Row element, FSDataOutputStream out) throws IOException {
    =C2=A0 =C2=A0 int numFields =3D element.getArity();

    =C2=A0 =C2=A0 for (int i =3D 0; i < numFields; i++) {
    =C2=A0 =C2=A0 =C2=A0 Object obj =3D element.getField(i);
    =C2=A0 =C2=A0 =C2=A0 if (obj !=3D null) {
    =C2=A0 =C2=A0 =C2=A0 =C2=A0 if (i !=3D 0) {
    =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 out.write(this.fieldDelimiter.getBytes(charset));
    =C2=A0 =C2=A0 =C2=A0 =C2=A0 }

    =C2=A0 =C2=A0 =C2=A0 =C2=A0 if (quoteStrings) {
    =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 if (obj instanceof String || obj instanceof StringValue) {
    =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 out.write('"');
    =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 out.write(obj.toString().getBytes(charset));
    =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 out.write('"');
    =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 } else {
    =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 out.write(obj.toString().getBytes(charset));
    =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 }
    =C2=A0 =C2=A0 =C2=A0 =C2=A0 } else {
    =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 out.write(obj.toString().getBytes(charset));
    =C2=A0 =C2=A0 =C2=A0 =C2=A0 }
    =C2=A0 =C2=A0 =C2=A0 } else {
    =C2=A0 =C2=A0 =C2=A0 =C2=A0 if (this.allowNullValues) {
    =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 if (i !=3D 0) {
    =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 out.write(this.fieldDelimiter.getBytes(charset));
    =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 }
    =C2=A0 =C2=A0 =C2=A0 =C2=A0 } else {
    =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 throw new RuntimeException("Cannot write tuple with <null> value at position: " + i);
    =C2=A0 =C2=A0 =C2=A0 =C2=A0 }
    =C2=A0 =C2=A0 =C2=A0 }
    =C2=A0 =C2=A0 }

    =C2=A0 =C2=A0 // add the record delimiter
    =C2=A0 =C2=A0 out.write(this.recordDelimiter.getBytes(charset));
    =C2=A0 }

    =C2=A0 @Override
    =C2=A0 public Writer<Row> duplicate() {
    =C2=A0 =C2=A0 return new RowCsvWriter(charsetName, fieldDelimiter, recordDelimiter);
    =C2=A0 }
    }



    Any help is appreciated,
    Flavio




    --001a113cfd2cf28f3605590f489e--