flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: BucketingSink never closed
Date Fri, 08 Sep 2017 15:29:55 GMT
Hi,

Expanding a bit on Kostas' answer. Yes, your analysis is correct, the problem is that the
job is shutting down before a last checkpoint can "confirm" the written bucket data by moving
it to the final state. The problem, as Kostas noted is that a user function (and thus also
BucketingSink) does not know whether close() is being called because of a failure or because
normal job shutdown. Therefore, we cannot move data to the final stage there.

Once we have the issue that Kostas posted resolve we can also resolve this problem for the
BucketingSink.

Best,
Aljoscha

> On 8. Sep 2017, at 16:48, Kostas Kloudas <k.kloudas@data-artisans.com> wrote:
> 
> Hi Flavio,
> 
> If I understand correctly, I think you bumped into this issue: https://issues.apache.org/jira/browse/FLINK-2646
<https://issues.apache.org/jira/browse/FLINK-2646>
> 
> There is also a similar discussion on the BucketingSink here: 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Adding-a-dispose-method-in-the-RichFunction-td14466.html#a14468
<http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Adding-a-dispose-method-in-the-RichFunction-td14466.html#a14468>
> 
> Kostas
> 
>> On Sep 8, 2017, at 4:27 PM, Flavio Pompermaier <pompermaier@okkam.it <mailto:pompermaier@okkam.it>>
wrote:
>> 
>> Hi to all,
>> I'm trying to test a streaming job but the files written by the BucketingSink are
never finalized (remains into the pending state).
>> Is this caused by the fact that the job finishes before the checkpoint?
>> Shouldn't the sink properly close anyway?
>> 
>> This is my code:
>> 
>>   @Test
>>   public void testBucketingSink() throws Exception {
>>     final StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
>>     final StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(senv);
>>     senv.enableCheckpointing(5000);
>>     DataStream<String> testStream = senv.fromElements(//
>>         "1,aaa,white", //
>>         "2,bbb,gray", //
>>         "3,ccc,white", //
>>         "4,bbb,gray", //
>>         "5,bbb,gray" //
>>     );
>>     final RowTypeInfo rtf = new RowTypeInfo(
>>         BasicTypeInfo.STRING_TYPE_INFO,
>>         BasicTypeInfo.STRING_TYPE_INFO, 
>>         BasicTypeInfo.STRING_TYPE_INFO);
>>     DataStream<Row> rows = testStream.map(new MapFunction<String, Row>()
{
>> 
>>       private static final long serialVersionUID = 1L;
>> 
>>       @Override
>>       public Row map(String str) throws Exception {
>>         String[] split = str.split(Pattern.quote(","));
>>         Row ret = new Row(3);
>>         ret.setField(0, split[0]);
>>         ret.setField(1, split[1]);
>>         ret.setField(2, split[2]);
>>         return ret;
>>       }
>>     }).returns(rtf);
>> 
>>     String columnNames = "id,value,state";
>>     final String dsName = "test";
>>     tEnv.registerDataStream(dsName, rows, columnNames);
>>     final String whiteAreaFilter = "state = 'white'";
>>     DataStream<Row> grayArea = rows;
>>     DataStream<Row> whiteArea = null;
>>     if (whiteAreaFilter != null) {
>>       String sql = "SELECT *, (%s) as _WHITE FROM %s";
>>       sql = String.format(sql, whiteAreaFilter, dsName);
>>       Table table = tEnv.sql(sql);
>>       grayArea = tEnv.toDataStream(table.where("!_WHITE").select(columnNames), rtf);
>>       DataStream<Row> nw = tEnv.toDataStream(table.where("_WHITE").select(columnNames),
rtf);
>>       whiteArea = whiteArea == null ? nw : whiteArea.union(nw);
>>     }
>>     Writer<Row> bucketSinkwriter = new RowCsvWriter("UTF-8", "\t", "\n");
>> 
>>     String datasetWhiteDir = "/tmp/bucket/white";
>>     BucketingSink<Row> whiteAreaSink = new BucketingSink<>(datasetWhiteDir.toString());
>>     whiteAreaSink.setWriter(bucketSinkwriter);
>>     whiteAreaSink.setBatchSize(10);
>>     whiteArea.addSink(whiteAreaSink);
>> 
>>     String datasetGrayDir = "/tmp/bucket/gray";
>>     BucketingSink<Row> grayAreaSink = new BucketingSink<>(datasetGrayDir.toString());
>>     grayAreaSink.setWriter(bucketSinkwriter);
>>     grayAreaSink.setBatchSize(10);
>>     grayArea.addSink(grayAreaSink);
>> 
>>     JobExecutionResult jobInfo = senv.execute("Buketing sink test ");
>>     System.out.printf("Job took %s minutes", jobInfo.getNetRuntime(TimeUnit.MINUTES));
>>   }
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> public class RowCsvWriter extends StreamWriterBase<Row> {
>>   private static final long serialVersionUID = 1L;
>> 
>>   private final String charsetName;
>>   private transient Charset charset;
>>   private String fieldDelimiter;
>>   private String recordDelimiter;
>>   private boolean allowNullValues = true;
>>   private boolean quoteStrings = false;
>> 
>>   /**
>>    * Creates a new {@code StringWriter} that uses {@code "UTF-8"} charset to convert
strings to
>>    * bytes.
>>    */
>>   public RowCsvWriter() {
>>     this("UTF-8", CsvOutputFormat.DEFAULT_FIELD_DELIMITER, CsvOutputFormat.DEFAULT_LINE_DELIMITER);
>>   }
>> 
>>   /**
>>    * Creates a new {@code StringWriter} that uses the given charset to convert strings
to bytes.
>>    *
>>    * @param charsetName Name of the charset to be used, must be valid input for
>>    *        {@code Charset.forName(charsetName)}
>>    */
>>   public RowCsvWriter(String charsetName, String fieldDelimiter, String recordDelimiter)
{
>>     this.charsetName = charsetName;
>>     this.fieldDelimiter = fieldDelimiter;
>>     this.recordDelimiter = recordDelimiter;
>>   }
>> 
>>   @Override
>>   public void open(FileSystem fs, Path path) throws IOException {
>>     super.open(fs, path);
>> 
>>     try {
>>       this.charset = Charset.forName(charsetName);
>>     } catch (IllegalCharsetNameException ex) {
>>       throw new IOException("The charset " + charsetName + " is not valid.", ex);
>>     } catch (UnsupportedCharsetException ex) {
>>       throw new IOException("The charset " + charsetName + " is not supported.",
ex);
>>     }
>>   }
>> 
>>   @Override
>>   public void write(Row element) throws IOException {
>>     FSDataOutputStream outputStream = getStream();
>>     writeRow(element, outputStream);
>>   }
>> 
>>   private void writeRow(Row element, FSDataOutputStream out) throws IOException {
>>     int numFields = element.getArity();
>> 
>>     for (int i = 0; i < numFields; i++) {
>>       Object obj = element.getField(i);
>>       if (obj != null) {
>>         if (i != 0) {
>>           out.write(this.fieldDelimiter.getBytes(charset));
>>         }
>> 
>>         if (quoteStrings) {
>>           if (obj instanceof String || obj instanceof StringValue) {
>>             out.write('"');
>>             out.write(obj.toString().getBytes(charset));
>>             out.write('"');
>>           } else {
>>             out.write(obj.toString().getBytes(charset));
>>           }
>>         } else {
>>           out.write(obj.toString().getBytes(charset));
>>         }
>>       } else {
>>         if (this.allowNullValues) {
>>           if (i != 0) {
>>             out.write(this.fieldDelimiter.getBytes(charset));
>>           }
>>         } else {
>>           throw new RuntimeException("Cannot write tuple with <null> value
at position: " + i);
>>         }
>>       }
>>     }
>> 
>>     // add the record delimiter
>>     out.write(this.recordDelimiter.getBytes(charset));
>>   }
>> 
>>   @Override
>>   public Writer<Row> duplicate() {
>>     return new RowCsvWriter(charsetName, fieldDelimiter, recordDelimiter);
>>   }
>> }
>> 
>> 
>> 
>> Any help is appreciated,
>> Flavio
> 


Mime
View raw message