flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kostas Kloudas <k.klou...@data-artisans.com>
Subject Re: BucketingSink never closed
Date Fri, 08 Sep 2017 14:48:15 GMT
Hi Flavio,

If I understand correctly, I think you bumped into this issue: https://issues.apache.org/jira/browse/FLINK-2646
<https://issues.apache.org/jira/browse/FLINK-2646>

There is also a similar discussion on the BucketingSink here: 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Adding-a-dispose-method-in-the-RichFunction-td14466.html#a14468
<http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Adding-a-dispose-method-in-the-RichFunction-td14466.html#a14468>

Kostas

> On Sep 8, 2017, at 4:27 PM, Flavio Pompermaier <pompermaier@okkam.it> wrote:
> 
> Hi to all,
> I'm trying to test a streaming job but the files written by the BucketingSink are never
finalized (remains into the pending state).
> Is this caused by the fact that the job finishes before the checkpoint?
> Shouldn't the sink properly close anyway?
> 
> This is my code:
> 
>   @Test
>   public void testBucketingSink() throws Exception {
>     final StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
>     final StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(senv);
>     senv.enableCheckpointing(5000);
>     DataStream<String> testStream = senv.fromElements(//
>         "1,aaa,white", //
>         "2,bbb,gray", //
>         "3,ccc,white", //
>         "4,bbb,gray", //
>         "5,bbb,gray" //
>     );
>     final RowTypeInfo rtf = new RowTypeInfo(
>         BasicTypeInfo.STRING_TYPE_INFO,
>         BasicTypeInfo.STRING_TYPE_INFO, 
>         BasicTypeInfo.STRING_TYPE_INFO);
>     DataStream<Row> rows = testStream.map(new MapFunction<String, Row>()
{
> 
>       private static final long serialVersionUID = 1L;
> 
>       @Override
>       public Row map(String str) throws Exception {
>         String[] split = str.split(Pattern.quote(","));
>         Row ret = new Row(3);
>         ret.setField(0, split[0]);
>         ret.setField(1, split[1]);
>         ret.setField(2, split[2]);
>         return ret;
>       }
>     }).returns(rtf);
> 
>     String columnNames = "id,value,state";
>     final String dsName = "test";
>     tEnv.registerDataStream(dsName, rows, columnNames);
>     final String whiteAreaFilter = "state = 'white'";
>     DataStream<Row> grayArea = rows;
>     DataStream<Row> whiteArea = null;
>     if (whiteAreaFilter != null) {
>       String sql = "SELECT *, (%s) as _WHITE FROM %s";
>       sql = String.format(sql, whiteAreaFilter, dsName);
>       Table table = tEnv.sql(sql);
>       grayArea = tEnv.toDataStream(table.where("!_WHITE").select(columnNames), rtf);
>       DataStream<Row> nw = tEnv.toDataStream(table.where("_WHITE").select(columnNames),
rtf);
>       whiteArea = whiteArea == null ? nw : whiteArea.union(nw);
>     }
>     Writer<Row> bucketSinkwriter = new RowCsvWriter("UTF-8", "\t", "\n");
> 
>     String datasetWhiteDir = "/tmp/bucket/white";
>     BucketingSink<Row> whiteAreaSink = new BucketingSink<>(datasetWhiteDir.toString());
>     whiteAreaSink.setWriter(bucketSinkwriter);
>     whiteAreaSink.setBatchSize(10);
>     whiteArea.addSink(whiteAreaSink);
> 
>     String datasetGrayDir = "/tmp/bucket/gray";
>     BucketingSink<Row> grayAreaSink = new BucketingSink<>(datasetGrayDir.toString());
>     grayAreaSink.setWriter(bucketSinkwriter);
>     grayAreaSink.setBatchSize(10);
>     grayArea.addSink(grayAreaSink);
> 
>     JobExecutionResult jobInfo = senv.execute("Buketing sink test ");
>     System.out.printf("Job took %s minutes", jobInfo.getNetRuntime(TimeUnit.MINUTES));
>   }
> 
> 
> 
> 
> 
> 
> 
> public class RowCsvWriter extends StreamWriterBase<Row> {
>   private static final long serialVersionUID = 1L;
> 
>   private final String charsetName;
>   private transient Charset charset;
>   private String fieldDelimiter;
>   private String recordDelimiter;
>   private boolean allowNullValues = true;
>   private boolean quoteStrings = false;
> 
>   /**
>    * Creates a new {@code StringWriter} that uses {@code "UTF-8"} charset to convert
strings to
>    * bytes.
>    */
>   public RowCsvWriter() {
>     this("UTF-8", CsvOutputFormat.DEFAULT_FIELD_DELIMITER, CsvOutputFormat.DEFAULT_LINE_DELIMITER);
>   }
> 
>   /**
>    * Creates a new {@code StringWriter} that uses the given charset to convert strings
to bytes.
>    *
>    * @param charsetName Name of the charset to be used, must be valid input for
>    *        {@code Charset.forName(charsetName)}
>    */
>   public RowCsvWriter(String charsetName, String fieldDelimiter, String recordDelimiter)
{
>     this.charsetName = charsetName;
>     this.fieldDelimiter = fieldDelimiter;
>     this.recordDelimiter = recordDelimiter;
>   }
> 
>   @Override
>   public void open(FileSystem fs, Path path) throws IOException {
>     super.open(fs, path);
> 
>     try {
>       this.charset = Charset.forName(charsetName);
>     } catch (IllegalCharsetNameException ex) {
>       throw new IOException("The charset " + charsetName + " is not valid.", ex);
>     } catch (UnsupportedCharsetException ex) {
>       throw new IOException("The charset " + charsetName + " is not supported.", ex);
>     }
>   }
> 
>   @Override
>   public void write(Row element) throws IOException {
>     FSDataOutputStream outputStream = getStream();
>     writeRow(element, outputStream);
>   }
> 
>   private void writeRow(Row element, FSDataOutputStream out) throws IOException {
>     int numFields = element.getArity();
> 
>     for (int i = 0; i < numFields; i++) {
>       Object obj = element.getField(i);
>       if (obj != null) {
>         if (i != 0) {
>           out.write(this.fieldDelimiter.getBytes(charset));
>         }
> 
>         if (quoteStrings) {
>           if (obj instanceof String || obj instanceof StringValue) {
>             out.write('"');
>             out.write(obj.toString().getBytes(charset));
>             out.write('"');
>           } else {
>             out.write(obj.toString().getBytes(charset));
>           }
>         } else {
>           out.write(obj.toString().getBytes(charset));
>         }
>       } else {
>         if (this.allowNullValues) {
>           if (i != 0) {
>             out.write(this.fieldDelimiter.getBytes(charset));
>           }
>         } else {
>           throw new RuntimeException("Cannot write tuple with <null> value at position:
" + i);
>         }
>       }
>     }
> 
>     // add the record delimiter
>     out.write(this.recordDelimiter.getBytes(charset));
>   }
> 
>   @Override
>   public Writer<Row> duplicate() {
>     return new RowCsvWriter(charsetName, fieldDelimiter, recordDelimiter);
>   }
> }
> 
> 
> 
> Any help is appreciated,
> Flavio


Mime
View raw message