beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robert Bradshaw <rober...@google.com>
Subject Re: Unbalanced FileIO writes on Flink
Date Fri, 26 Oct 2018 09:41:54 GMT
I think it's worth adding a URN for the operation of distributing "evenly"
into an "appropriate" number of shards. A naive implementation would add
random keys and to a ReshufflePerKey, but runners could override this to do
a reshuffle and then key by whatever notion of bundle/worker/shard
identifier they have that lines up with the number of actual workers.

On Fri, Oct 26, 2018 at 11:34 AM Jozef Vilcek <jozo.vilcek@gmail.com> wrote:

> Thanks for the JIRA. If I understand it correctly ... so runner determined
> sharding will avoid extra shuffle? Will it just write worker local
> available data to it's shard? Something similar to coalesce in Spark?
>
> On Fri, Oct 26, 2018 at 11:26 AM Maximilian Michels <mxm@apache.org>
> wrote:
>
>> Oh ok, thanks for the pointer. Coming from Flink, the default is that
>> the sharding is determined by the runtime distribution. Indeed, we will
>> have to add an overwrite to the Flink Runner, similar to this one:
>>
>>
>> https://github.com/apache/beam/commit/cbb922c8a72680c5b8b4299197b515abf650bfdf#diff-a79d5c3c33f6ef1c4894b97ca907d541R347
>>
>> Jira issue: https://issues.apache.org/jira/browse/BEAM-5865
>>
>> Thanks,
>> Max
>>
>> On 25.10.18 22:37, Reuven Lax wrote:
>> > FYI the Dataflow runner automatically sets the default number of shards
>> > (I believe to be 2 * num_workers). Probably we should do something
>> > similar for the Flink runner.
>> >
>> > This needs to be done by the runner, as # of workers is a runner
>> > concept; the SDK itself has no concept of workers.
>> >
>> > On Thu, Oct 25, 2018 at 3:28 AM Jozef Vilcek <jozo.vilcek@gmail.com
>> > <mailto:jozo.vilcek@gmail.com>> wrote:
>> >
>> >     If I do not specify shards for unbounded collection, I get
>> >
>> >     Caused by: java.lang.IllegalArgumentException: When applying
>> >     WriteFiles to an unbounded PCollection, must specify number of
>> >     output shards explicitly
>> >              at
>> >     org.apache.beam.repackaged.beam_sdks_java_core.com
>> .google.common.base.Preconditions.checkArgument(Preconditions.java:191)
>> >              at
>> >     org.apache.beam.sdk.io.WriteFiles.expand(WriteFiles.java:289)
>> >
>> >     Around same lines in WriteFiles is also a check for windowed writes.
>> >     I believe FileIO enables it explicitly when windowing is present. In
>> >     filesystem written files are per window and shard.
>> >
>> >     On Thu, Oct 25, 2018 at 12:01 PM Maximilian Michels <mxm@apache.org
>> >     <mailto:mxm@apache.org>> wrote:
>> >
>> >         I agree it would be nice to keep the current distribution of
>> >         elements
>> >         instead of doing a shuffle based on an artificial shard key.
>> >
>> >         Have you tried `withWindowedWrites()`? Also, why do you say you
>> >         need to
>> >         specify the number of shards in streaming mode?
>> >
>> >         -Max
>> >
>> >         On 25.10.18 10:12, Jozef Vilcek wrote:
>> >          > Hm, yes, this makes sense now, but what can be done for my
>> >         case? I do
>> >          > not want to end up with too many files on disk.
>> >          >
>> >          > I think what I am looking for is to instruct IO that do not
>> >         do again
>> >          > random shard and reshuffle but just assume number of shards
>> >         equal to
>> >          > number of workers and shard ID is a worker ID.
>> >          > Is this doable in beam model?
>> >          >
>> >          > On Wed, Oct 24, 2018 at 4:07 PM Maximilian Michels
>> >         <mxm@apache.org <mailto:mxm@apache.org>
>> >          > <mailto:mxm@apache.org <mailto:mxm@apache.org>>>
wrote:
>> >          >
>> >          >     The FlinkRunner uses a hash function (MurmurHash) on each
>> >         key which
>> >          >     places keys somewhere in the hash space. The hash space
>> >         (2^32) is split
>> >          >     among the partitions (5 in your case). Given enough keys,
>> >         the chance
>> >          >     increases they are equally spread.
>> >          >
>> >          >     This should be similar to what the other Runners do.
>> >          >
>> >          >     On 24.10.18 10:58, Jozef Vilcek wrote:
>> >          >      >
>> >          >      > So if I run 5 workers with 50 shards, I end up with:
>> >          >      >
>> >          >      > DurationBytes receivedRecords received
>> >          >      >   2m 39s        900 MB            465,525
>> >          >      >   2m 39s       1.76 GB            930,720
>> >          >      >   2m 39s        789 MB            407,315
>> >          >      >   2m 39s       1.32 GB            698,262
>> >          >      >   2m 39s        788 MB            407,310
>> >          >      >
>> >          >      > Still not good but better than with 5 shards where
>> >         some workers
>> >          >     did not
>> >          >      > participate at all.
>> >          >      > So, problem is in some layer which distributes keys
/
>> >         shards
>> >          >     among workers?
>> >          >      >
>> >          >      > On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax
>> >         <relax@google.com <mailto:relax@google.com>
>> >          >     <mailto:relax@google.com <mailto:relax@google.com>>
>> >          >      > <mailto:relax@google.com <mailto:relax@google.com>
>> >         <mailto:relax@google.com <mailto:relax@google.com>>>>
wrote:
>> >          >      >
>> >          >      >     withNumShards(5) generates 5 random shards. It
>> >         turns out that
>> >          >      >     statistically when you generate 5 random shards
>> >         and you have 5
>> >          >      >     works, the probability is reasonably high that
>> >         some workers
>> >          >     will get
>> >          >      >     more than one shard (and as a result not all
>> >         workers will
>> >          >      >     participate). Are you able to set the number of
>> >         shards larger
>> >          >     than 5?
>> >          >      >
>> >          >      >     On Wed, Oct 24, 2018 at 12:28 AM Jozef Vilcek
>> >          >     <jozo.vilcek@gmail.com <mailto:jozo.vilcek@gmail.com>
>> >         <mailto:jozo.vilcek@gmail.com <mailto:jozo.vilcek@gmail.com>>
>> >          >      >     <mailto:jozo.vilcek@gmail.com
>> >         <mailto:jozo.vilcek@gmail.com>
>> >          >     <mailto:jozo.vilcek@gmail.com
>> >         <mailto:jozo.vilcek@gmail.com>>>> wrote:
>> >          >      >
>> >          >      >         cc (dev)
>> >          >      >
>> >          >      >         I tried to run the example with FlinkRunner
in
>> >         batch mode and
>> >          >      >         received again bad data spread among the
>> workers.
>> >          >      >
>> >          >      >         When I tried to remove number of shards for
>> >         batch mode in
>> >          >     above
>> >          >      >         example, pipeline crashed before launch
>> >          >      >
>> >          >      >         Caused by: java.lang.IllegalStateException:
>> >         Inputs to Flatten
>> >          >      >         had incompatible triggers:
>> >          >      >
>> >          >
>> >
>>  AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
>> >          >      >         entCountAtLeast(10000)),
>> >          >      >
>> >          >
>> >
>>  Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
>> >          >      >         hour)))),
>> >          >      >
>> >          >
>> >
>>  AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
>> >          >      >         rever(AfterPane.elementCountAtLeast(1)),
>> >          >      >
>> >          >
>> >
>>  Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane())))
>> >          >      >
>> >          >      >
>> >          >      >
>> >          >      >
>> >          >      >
>> >          >      >         On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek
>> >          >      >         <jozo.vilcek@gmail.com
>> >         <mailto:jozo.vilcek@gmail.com> <mailto:jozo.vilcek@gmail.com
>> >         <mailto:jozo.vilcek@gmail.com>>
>> >          >     <mailto:jozo.vilcek@gmail.com
>> >         <mailto:jozo.vilcek@gmail.com> <mailto:jozo.vilcek@gmail.com
>> >         <mailto:jozo.vilcek@gmail.com>>>> wrote:
>> >          >      >
>> >          >      >             Hi Max,
>> >          >      >
>> >          >      >             I forgot to mention that example is run
in
>> >         streaming
>> >          >     mode,
>> >          >      >             therefore I can not do writes without
>> >         specifying shards.
>> >          >      >             FileIO explicitly asks for them.
>> >          >      >
>> >          >      >             I am not sure where the problem is.
>> >         FlinkRunner is
>> >          >     only one
>> >          >      >             I used.
>> >          >      >
>> >          >      >             On Tue, Oct 23, 2018 at 11:27 AM
>> >         Maximilian Michels
>> >          >      >             <mxm@apache.org <mailto:mxm@apache.org>
>> >         <mailto:mxm@apache.org <mailto:mxm@apache.org>>
>> >          >     <mailto:mxm@apache.org <mailto:mxm@apache.org>
>> >         <mailto:mxm@apache.org <mailto:mxm@apache.org>>>>
wrote:
>> >          >      >
>> >          >      >                 Hi Jozef,
>> >          >      >
>> >          >      >                 This does not look like a FlinkRunner
>> >         related
>> >          >     problem,
>> >          >      >                 but is caused by
>> >          >      >                 the `WriteFiles` sharding logic. It
>> >         assigns keys and
>> >          >      >                 does a Reshuffle
>> >          >      >                 which apparently does not lead to good
>> >         data spread in
>> >          >      >                 your case.
>> >          >      >
>> >          >      >                 Do you see the same behavior without
>> >          >     `withNumShards(5)`?
>> >          >      >
>> >          >      >                 Thanks,
>> >          >      >                 Max
>> >          >      >
>> >          >      >                 On 22.10.18 11:57, Jozef Vilcek wrote:
>> >          >      >                  > Hello,
>> >          >      >                  >
>> >          >      >                  > I am having some trouble to get
a
>> >         balanced
>> >          >     write via
>> >          >      >                 FileIO. Workers at
>> >          >      >                  > the shuffle side where data per
>> >         window fire are
>> >          >      >                 written to the
>> >          >      >                  > filesystem receive unbalanced
>> >         number of events.
>> >          >      >                  >
>> >          >      >                  > Here is a naive code example:
>> >          >      >                  >
>> >          >      >                  >      val read = KafkaIO.read()
>> >          >      >                  >          .withTopic("topic")
>> >          >      >                  >
>> >         .withBootstrapServers("kafka1:9092")
>> >          >      >                  >
>> >          >      >
>> >           .withKeyDeserializer(classOf[ByteArrayDeserializer])
>> >          >      >                  >
>> >          >      >
>> >          >       .withValueDeserializer(classOf[ByteArrayDeserializer])
>> >          >      >                  >          .withProcessingTime()
>> >          >      >                  >
>> >          >      >                  >      pipeline
>> >          >      >                  >          .apply(read)
>> >          >      >                  >          .apply(MapElements.via(new
>> >          >      >                  >
>> SimpleFunction[KafkaRecord[Array[Byte],
>> >          >     Array[Byte]],
>> >          >      >                 String]() {
>> >          >      >                  >            override def
>> apply(input:
>> >          >      >                 KafkaRecord[Array[Byte],
>> >          >      >                  > Array[Byte]]): String = {
>> >          >      >                  >              new
>> >         String(input.getKV.getValue,
>> >          >     "UTF-8")
>> >          >      >                  >            }
>> >          >      >                  >          }))
>> >          >      >                  >
>> >          >      >                  >
>> >          >      >                  >
>> >          >      >
>> >          >
>> >
>>  .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
>> >          >      >                  >
>> >          >     .triggering(AfterWatermark.pastEndOfWindow()
>> >          >      >                  >
>> >          >      >
>> >          >       .withEarlyFirings(AfterPane.elementCountAtLeast(40000))
>> >          >      >                  >
>> >          >      >
>> >          >
>>  .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
>> >          >      >                  >
>> >          >      >
>> >          >
>>  Repeatedly.forever(AfterPane.elementCountAtLeast(10000)),
>> >          >      >                  >
>> >          >      >                  >
>> >          >      >
>> >          >
>> >
>>  Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))))))
>> >          >      >                  >
>> .discardingFiredPanes()
>> >          >      >                  >
>> >          >      >
>> >           .withAllowedLateness(Duration.standardDays(7)))
>> >          >      >                  >
>> >          >      >                  >          .apply(FileIO.write()
>> >          >      >                  >              .via(TextIO.sink())
>> >          >      >                  >              .withNaming(new
>> >          >      >                 SafeFileNaming(outputPath, ".txt"))
>> >          >      >                  >
>> >         .withTempDirectory(tempLocation)
>> >          >      >                  >              .withNumShards(5))
>> >          >      >                  >
>> >          >      >                  >
>> >          >      >                  > If I run this on Beam 2.6.0 with
>> >         Flink 1.5.0 on 5
>> >          >      >                 workers (equal to
>> >          >      >                  > number of shards), I would expect
>> >         that each worker
>> >          >      >                 will participate on
>> >          >      >                  > persisting shards and equally,
>> >         since code uses
>> >          >     fixed
>> >          >      >                 number of shards
>> >          >      >                  > (and random shard assign?). But
>> >         reality is
>> >          >     different
>> >          >      >                 (see 2 attachements
>> >          >      >                  > - statistiscs from flink task
>> >         reading from
>> >          >     kafka and
>> >          >      >                 task writing to files)
>> >          >      >                  >
>> >          >      >                  > What am I missing? How to achieve
>> >         balanced writes?
>> >          >      >                  >
>> >          >      >                  > Thanks,
>> >          >      >                  > Jozef
>> >          >      >                  >
>> >          >      >                  >
>> >          >      >
>> >          >
>> >
>>
>

Mime
View raw message