beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jean-Baptiste Onofré ...@nanthrax.net>
Subject Re: Items not groupped correctly - CoGroupByKey - FlinkPipelienRunner
Date Tue, 31 May 2016 14:49:09 GMT
It sounds like a bug in the Flink runner. I'm pretty sure Max and 
Aljoscha will fix that soon ;)

Regards
JB

On 05/31/2016 03:47 PM, Pawel Szczur wrote:
> FYI I've opened an issue: https://issues.apache.org/jira/browse/BEAM-315
>
> 2016-05-31 14:51 GMT+02:00 Pawel Szczur <pawelszczur@gmail.com
> <mailto:pawelszczur@gmail.com>>:
>
>     I've also added the test for GroupByKey. It fails. It kind of makes
>     Flink broken at the moment, isn't it?
>
>     I'm wondering.. may it be related to some Windowing issue?
>
>     2016-05-31 14:40 GMT+02:00 Pawel Szczur <pawelszczur@gmail.com
>     <mailto:pawelszczur@gmail.com>>:
>
>         I've just tested it. It fails.
>
>         Also added the test to the repo:
>         https://github.com/orian/cogroup-wrong-grouping
>
>         I reason, this means that GroupByKey is flawed? If you open an
>         official issue, please add it to discussion.
>
>         2016-05-31 11:55 GMT+02:00 Aljoscha Krettek <aljoscha@apache.org
>         <mailto:aljoscha@apache.org>>:
>
>             Does 2. work for the cases where CoGroupByKey fails? Reason
>             I'm asking is that CoGroupByKey is essentially implemented
>             like that internally: create tagged union -> flatten ->
>             GroupByKey.
>
>             On Tue, 31 May 2016 at 01:16 Pawel Szczur
>             <pawelszczur@gmail.com <mailto:pawelszczur@gmail.com>> wrote:
>
>                 I've naively tried few other key types, it seems to be
>                 unrelated to key type.
>
>                 As for now I have two workarounds and ignorance:
>                   1. If there is one dominant dataset and other datasets
>                 are small (size << GB) then I use SideInput.
>                   2. If I have multiple datasets of similar size I
>                 enclose it in a common container, flatten it and GroupByKey.
>                   3. I measure occurrences and ignore the bug for now.
>
>                 Do you have an idea how a test for this may be
>                 constructed? It seems handy, I think.
>
>                 I also found two things, maybe they help you:
>                   1. issue doesn't appear without parallelism
>                   2. issue doesn't appear with a tiny datasets
>
>                 2016-05-30 17:13 GMT+02:00 Aljoscha Krettek
>                 <aljoscha@apache.org <mailto:aljoscha@apache.org>>:
>
>                     You're right. I'm still looking into this,
>                     unfortunately I haven't made progress so far. I'll
>                     keep you posted.
>
>                     On Sun, 29 May 2016 at 18:20 Pawel Szczur
>                     <pawelszczur@gmail.com
>                     <mailto:pawelszczur@gmail.com>> wrote:
>
>                         Hi,
>
>                         I used the config as in the repo.
>                         Please grep the the log for
>                         "hereGoesLongStringID0,2", you will see that
>                         this key is processed multiple times.
>
>                         This is how I understand CoGroupByKey: one has
>                         two (or more) PCollection<KV<K,?>>. Both sets
>                         are grouped by key. For each unique key a KV<K,
>                         CoGbkResult> is produced, a given CoGbkResult
>                         contains all values from all input PCollections
>                         which have the given key.
>
>                         But from the log it seems that each key produced
>                         more than one CoGbkResult.
>
>                         The final counters didn't catch the bug because
>                         in your case, the value from dataset1 was
>                         replicated for each key.
>
>                         Cheers, Pawel
>
>                         2016-05-29 15:59 GMT+02:00 Aljoscha Krettek
>                         <aljoscha@apache.org <mailto:aljoscha@apache.org>>:
>
>                             Hi,
>                             I ran your data generator with these configs:
>                             p.apply(Create.of(new Config(3, 5, 600_000, 1)))
>                                  .apply(ParDo.of(new Generator())).apply(
>                             AvroIO.Write.to
>                             <http://AvroIO.Write.to>("/tmp/dataset1").withSchema(DumbData.class).withNumShards(6));
>
>                             p.apply(Create.of(new Config(3, 5, 600_000,
>                             2))).
>                                  apply(ParDo.of(new Generator())).apply(
>                             AvroIO.Write.to
>                             <http://AvroIO.Write.to>("/tmp/dataset2").withSchema(DumbData.class).withNumShards(6));
>
>                             Then I ran the job with parallelism=6. I
>                             couldn't reproduce the problem, this is the
>                             log file from one of several runs:
>                             https://gist.github.com/aljoscha/ef1d804f57671cd472c75b92b4aee51b
>
>                             Could you please send me the exact config
>                             that you used. Btw, I ran it inside an IDE,
>                             do the problems also occur in the IDE for
>                             you or only when you execute on a cluster?
>
>                             Cheers,
>                             Aljoscha
>
>                             On Sun, 29 May 2016 at 01:51 Pawel Szczur
>                             <pawelszczur@gmail.com
>                             <mailto:pawelszczur@gmail.com>> wrote:
>
>                                 Hi Aljoscha.
>
>                                 I've created a repo with fake dataset to
>                                 allow easily reproduce the problem:
>                                 https://github.com/orian/cogroup-wrong-grouping
>
>                                 What I noticed: if the dataset is too
>                                 small the bug doesn't appear.
>
>                                 You can modify the size of dataset, but
>                                 in ideal case it should be few hundred
>                                 thousands records per key (I guess it
>                                 depends on the machine you run it).
>
>                                 Cheers, Pawel
>
>                                 2016-05-28 12:45 GMT+02:00 Aljoscha
>                                 Krettek <aljoscha@apache.org
>                                 <mailto:aljoscha@apache.org>>:
>
>                                     Hi,
>                                     which version of Beam/Flink are you
>                                     using.
>
>                                     Could you maybe also provide example
>                                     data and code that showcases the
>                                     problem? If you have concerns about
>                                     sending it to a public list you can
>                                     also send it to me directly.
>
>                                     Cheers,
>                                     Aljoscha
>
>                                     On Fri, 27 May 2016 at 20:53 Pawel
>                                     Szczur <pawelszczur@gmail.com
>                                     <mailto:pawelszczur@gmail.com>> wrote:
>
>                                         *Data description.*
>
>                                         I have two datasets.
>
>                                         Records - the first, containes
>                                         around 0.5-1M of records per
>                                         (key,day). For testing I use 2-3
>                                         keys and 5-10 days of data. What
>                                         I shoot for is 1000+ keys. Each
>                                         record contains key, timestamp
>                                         in μ-seconds and some other data.
>                                         Configs - the second, is rather
>                                         small. It describes the key in
>                                         time, e.g. you can think about
>                                         it as a list of tuples: (key,
>                                         start date, end date, description).
>
>                                         For the exploration I've encoded
>                                         the data as files of
>                                         length-prefixed Protocol Buffer
>                                         binary encoded messages.
>                                         Additionally the files are
>                                         packed with gzip. Data is
>                                         sharded by date. Each file is
>                                         around 10MB.
>
>                                         *Pipeline*
>
>                                         First I add keys to both
>                                         datasets. For Records dataset
>                                         it's (key, day rounded
>                                         timestamp). For Configs a key is
>                                         (key, day), where day is each
>                                         timestamp value between start
>                                         date and end date (pointing
>                                         midnight).
>                                         The datasets are merged using
>                                         CoGroupByKey.
>
>                                         As a key type I use import
>                                         org.apache.flink.api.java.tuple.Tuple2
>                                         with a Tuple2Coder from this repo.
>
>                                         *The problem*
>
>                                         If the Records dataset is tiny
>                                         like 5 days, everything seems
>                                         fine (check normal_run.log).
>
>                                           INFO [main]
>                                         (FlinkPipelineRunner.java:124) -
>                                         Final aggregator values:
>                                           INFO [main]
>                                         (FlinkPipelineRunner.java:127) -
>                                         item count : 4322332
>                                           INFO [main]
>                                         (FlinkPipelineRunner.java:127) -
>                                         missing val1 : 0
>                                           INFO [main]
>                                         (FlinkPipelineRunner.java:127) -
>                                         multiple val1 : 0
>
>                                         When I run the pipeline against
>                                         10+ days I encounter an error
>                                         pointing that for some Records
>                                         there's no Config (wrong_run.log).
>
>                                           INFO [main]
>                                         (FlinkPipelineRunner.java:124) -
>                                         Final aggregator values:
>                                           INFO [main]
>                                         (FlinkPipelineRunner.java:127) -
>                                         item count : 8577197
>                                           INFO [main]
>                                         (FlinkPipelineRunner.java:127) -
>                                         missing val1 : 6
>                                           INFO [main]
>                                         (FlinkPipelineRunner.java:127) -
>                                         multiple val1 : 0
>
>                                         Then I've added some extra
>                                         logging messages:
>
>                                         (ConvertToItem.java:144) - 68643
>                                         items for KeyValue3 on:
>                                         1462665600000000
>                                         (ConvertToItem.java:140) - no
>                                         items for KeyValue3 on:
>                                         1463184000000000
>                                         (ConvertToItem.java:123) -
>                                         missing for KeyValue3 on:
>                                         1462924800000000
>                                         (ConvertToItem.java:142) -
>                                         753707 items for KeyValue3 on:
>                                         1462924800000000 marked as no-loc
>                                         (ConvertToItem.java:123) -
>                                         missing for KeyValue3 on:
>                                         1462752000000000
>                                         (ConvertToItem.java:142) -
>                                         749901 items for KeyValue3 on:
>                                         1462752000000000 marked as no-loc
>                                         (ConvertToItem.java:144) -
>                                         754578 items for KeyValue3 on:
>                                         1462406400000000
>                                         (ConvertToItem.java:144) -
>                                         751574 items for KeyValue3 on:
>                                         1463011200000000
>                                         (ConvertToItem.java:123) -
>                                         missing for KeyValue3 on:
>                                         1462665600000000
>                                         (ConvertToItem.java:142) -
>                                         754758 items for KeyValue3 on:
>                                         1462665600000000 marked as no-loc
>                                         (ConvertToItem.java:123) -
>                                         missing for KeyValue3 on:
>                                         1463184000000000
>                                         (ConvertToItem.java:142) -
>                                         694372 items for KeyValue3 on:
>                                         1463184000000000 marked as no-loc
>
>                                         You can spot that in first line
>                                         68643 items were processed for
>                                         KeyValue3 and time 1462665600000000.
>                                         Later on in line 9 it seems the
>                                         operation processes the same key
>                                         again, but it reports that no
>                                         Config was available for these
>                                         Records.
>                                         The line 10 informs they've been
>                                         marked as no-loc.
>
>                                         The line 2 is saying that there
>                                         were no items for KeyValue3 and
>                                         time 1463184000000000, but in
>                                         line 11 you can read that the
>                                         items for this (key,day) pair
>                                         were processed later and they've
>                                         lacked a Config.
>
>                                         *Work-around (after more
>                                         testing, doesn't work, staying
>                                         with Tuple2)*
>
>                                         I've switched from using Tuple2
>                                         to a Protocol Buffer message:
>
>                                         message KeyDay {
>                                            optional ByteString key = 1;
>                                            optional int64 timestamp_usec
>                                         = 2;
>                                         }
>
>                                         But using Tuple2.of() was just
>                                         easier than:
>                                         KeyDay.newBuilder().setKey(...).setTimestampUsec(...).build().
>
>                                         // The original description
>                                         comes from:
>                                         http://stackoverflow.com/questions/37473682/items-not-groupped-correctly-cogroupbykey
>
>
>
>
>
>
>

-- 
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Mime
View raw message