flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rinat <r.shari...@cleverdata.ru>
Subject Re: [BucketingSink] incorrect indexing of part files, when part suffix is specified
Date Sat, 23 Jun 2018 18:28:23 GMT
Hi mates, could anyone please have a look on my PR, that fixes issue of incorrect indexing
in BucketingSink component ?

Thx

> On 18 Jun 2018, at 10:55, Rinat <r.sharipov@cleverdata.ru> wrote:
> 
> I’ve created a JIRA issue https://issues.apache.org/jira/browse/FLINK-9603 <https://issues.apache.org/jira/browse/FLINK-9603>
and added a proposal with PR.
> 
> Thx
> 
>> On 16 Jun 2018, at 17:21, Rinat <r.sharipov@cleverdata.ru <mailto:r.sharipov@cleverdata.ru>>
wrote:
>> 
>> Hi mates, since 1.5 release, BucketingSink has ability to configure suffix of the
part file. It’s very useful, when it’s necessary to set specific extension of the file.
>> 
>> During the usage, I’ve found the issue - when new part file is created, it has
the same part index, as index of just closed file. 
>> So, when Flink tries to move it into final state, we have a FileAlreadyExistsException.
>> 
>> This problem is related with the following code:
>> Here we are trying to find the max index of part file, that doesn’t exist in bucket
directory, the problem is, that the partSuffix is not involved into path assembly. This means,
that path always doesn’t exist
>> and partCounter wouldn’t be ever incremented.
>> Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter);
>> while (fs.exists(partPath) ||
>>       fs.exists(getPendingPathFor(partPath)) ||
>>       fs.exists(getInProgressPathFor(partPath))) {
>>    bucketState.partCounter++;
>>    partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter);
>> }
>> 
>> bucketState.creationTime = processingTimeService.getCurrentProcessingTime();
>> 
>> Before creating of writer, we appending the partSuffix here, but it should be already
appended, before index checks
>> if (partSuffix != null) {
>>    partPath = partPath.suffix(partSuffix);
>> }
>> I’ll create an issue and try to submit a fix
>> 
>> Sincerely yours,
>> Rinat Sharipov
>> Software Engineer at 1DMP CORE Team
>> 
>> email: r.sharipov@cleverdata.ru <mailto:a.totmakov@cleverdata.ru>
>> mobile: +7 (925) 416-37-26
>> 
>> CleverDATA
>> make your data clever
>> 
> 
> Sincerely yours,
> Rinat Sharipov
> Software Engineer at 1DMP CORE Team
> 
> email: r.sharipov@cleverdata.ru <mailto:a.totmakov@cleverdata.ru>
> mobile: +7 (925) 416-37-26
> 
> CleverDATA
> make your data clever
> 

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.sharipov@cleverdata.ru <mailto:a.totmakov@cleverdata.ru>
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever


Mime
View raw message