beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Daniel Halperin (JIRA)" <>
Subject [jira] [Closed] (BEAM-365) TextIO withoutSharding causes Flink to throw IllegalStateException
Date Wed, 29 Jun 2016 00:17:10 GMT


Daniel Halperin closed BEAM-365.
       Resolution: Fixed
    Fix Version/s: 0.2.0-incubating

> TextIO withoutSharding causes Flink to throw IllegalStateException
> ------------------------------------------------------------------
>                 Key: BEAM-365
>                 URL:
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>    Affects Versions: 0.2.0-incubating
>            Reporter: Pawel Szczur
>            Assignee: Daniel Halperin
>             Fix For: 0.2.0-incubating
> The exception: 
> {code}java.lang.IllegalStateException: Shard name template '' only generated 1 distinct
file names for 3 files{code}
> The initial discussion took place some time ago, the {{withoutSharding}} was then silently
ignored by the runner.
> Explanation from Aljoscha Krettek:
> {quote}
> Hi,
> the issue is a bit more complicated and involves the Beam sink API and the
> Flink runner.
> I'll have to get a bit into how Beam sinks work. The base class for sinks
> is Sink (TextIO.write gets translated to TextSink())).
> normally gets translated to three ParDo operations that cooperate
> to do the writing:
>  - "Initialize": this does initial initialization of the Sink, this is run
> only once, per sink, non-parallel.
>  - "WriteBundles": this gets an initialized sink on a side-input and the
> values to write on the main input. This runs in parallel, so for Flink, if
> you set parallelism=6 you'll get 6 parallel instances of this operation at
> runtime. This operation forwards information about where it writes to
> downstream. This does not write to the final file location but an
> intermediate staging location.
>  - "Finalize": This gets the initialized sink on the main-input and and the
> information about written files from "WriteBundles" as a side-input. This
> also only runs once, non-parallel. Here we're writing the intermediate
> files to a final location based on the sharding template.
> The problem is that and TextSink, as well as all other sinks,
> are not aware of the number of shards. If you set "withoutSharding()" this
> will set the shard template to "" (empty string) and the number of shards
> to 1. "WriteBundles", however is not aware of this and will write 6
> intermediate files if you set parallelism=6. In "Finalize" we will copy an
> intermediate file to the same final location 6 times based on the sharding
> template. The end result is that you only get one of the six result shards.
> The reason why this does only occur in the Flink runner is that all other
> runners have special overrides for TextIO.Write and AvroIO.Write that kick
> in if sharding control is required. So, for the time being this is a Flink
> runner bug and we might have to introduce special overrides as well until
> this is solved in the general case.
> Cheers,
> Aljoscha
> {quote}
> Original discussion:

This message was sent by Atlassian JIRA

View raw message