beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Leonardo Campos <leonardo.cam...@gameduell.de>
Subject SortValues - Ordering elements for each user
Date Tue, 31 Jul 2018 14:26:35 GMT
We have a feature request that demands ordering elements for a given user.

Currently, as an option, I'm investigating SortValues from the Java SDK 
extensions (https://beam.apache.org/documentation/sdks/java-extensions/).

Problem: Our simple test don't show the results in order.

I saw that MapElements is internally using a ParDo, which is probably 
why I'm getting out of order. If so, is there a way to avoid this?

Here is the code (running with Direct Runner):

//Creating the example input messages final List<String> testMessages =new ArrayList<>();
long time = System.currentTimeMillis();
for (int i =0; i <100; i++) {
     testMessages.add("{\"id\":123456789, \"time\":" + (time) +"\", time_ts\":\""+new Instant(time)+"\"}");
     time += TimeUnit.SECONDS.toMillis(1);
};

TypeDescriptor<KV<Long, KV<Long, String>>> typeDescriptor = TypeDescriptors.kvs(TypeDescriptors.longs(),
TypeDescriptors.kvs(TypeDescriptors.longs(), TypeDescriptors.strings()));

Pipeline pipe = Pipeline.create();
pipe
         .apply("Create with test input", Create.of(testMessages))
         .apply("Converting from Strings to KV<PrimaryKey, KV<SecondaryKey, String>>",
MapElements.into(typeDescriptor).via(eventStr -> {
             Any event = JsonIterator.deserialize(eventStr);
             return KV.of(event.get("id").toLong(), KV.of(event.get("time").toLong(), eventStr));
         }))
         .apply(GroupByKey.<Long, KV<Long, String>>create())
         .apply(SortValues.<Long, Long, String>create(BufferedExternalSorter.options()))
         .apply(MapElements.into(TypeDescriptors.strings()).via(kvWithIterable->{
             //Shouldn't this be sorted by the Secondary Key, which is the time? StreamSupport.stream(kvWithIterable.getValue().spliterator(),false)
                     .map(KV::getValue)
                     .forEach(System.out::println);
             return "";
         }));

pipe.run();

Output in console:
{"id":123456789, "time":1533046491394", time_ts":"2018-07-31T14:14:51.394Z"}
{"id":123456789, "time":1533046475394", time_ts":"2018-07-31T14:14:35.394Z"}
{"id":123456789, "time":1533046459394", time_ts":"2018-07-31T14:14:19.394Z"}
{"id":123456789, "time":1533046443394", time_ts":"2018-07-31T14:14:03.394Z"}
{"id":123456789, "time":1533046427394", time_ts":"2018-07-31T14:13:47.394Z"}
{"id":123456789, "time":1533046411394", time_ts":"2018-07-31T14:13:31.394Z"}
{"id":123456789, "time":1533046395394", time_ts":"2018-07-31T14:13:15.394Z"}
{"id":123456789, "time":1533046480394", time_ts":"2018-07-31T14:14:40.394Z"}
{"id":123456789, "time":1533046464394", time_ts":"2018-07-31T14:14:24.394Z"}
{"id":123456789, "time":1533046448394", time_ts":"2018-07-31T14:14:08.394Z"}
{"id":123456789, "time":1533046432394", time_ts":"2018-07-31T14:13:52.394Z"}
{"id":123456789, "time":1533046416394", time_ts":"2018-07-31T14:13:36.394Z"}
{"id":123456789, "time":1533046400394", time_ts":"2018-07-31T14:13:20.394Z"}
{"id":123456789, "time":1533046485394", time_ts":"2018-07-31T14:14:45.394Z"}
{"id":123456789, "time":1533046469394", time_ts":"2018-07-31T14:14:29.394Z"}
{"id":123456789, "time":1533046453394", time_ts":"2018-07-31T14:14:13.394Z"}
{"id":123456789, "time":1533046437394", time_ts":"2018-07-31T14:13:57.394Z"}
{"id":123456789, "time":1533046421394", time_ts":"2018-07-31T14:13:41.394Z"}
{"id":123456789, "time":1533046405394", time_ts":"2018-07-31T14:13:25.394Z"}
{"id":123456789, "time":1533046490394", time_ts":"2018-07-31T14:14:50.394Z"}
{"id":123456789, "time":1533046474394", time_ts":"2018-07-31T14:14:34.394Z"}
{"id":123456789, "time":1533046458394", time_ts":"2018-07-31T14:14:18.394Z"}
{"id":123456789, "time":1533046442394", time_ts":"2018-07-31T14:14:02.394Z"}
{"id":123456789, "time":1533046426394", time_ts":"2018-07-31T14:13:46.394Z"}
{"id":123456789, "time":1533046410394", time_ts":"2018-07-31T14:13:30.394Z"}
{"id":123456789, "time":1533046394394", time_ts":"2018-07-31T14:13:14.394Z"}
{"id":123456789, "time":1533046479394", time_ts":"2018-07-31T14:14:39.394Z"}
{"id":123456789, "time":1533046463394", time_ts":"2018-07-31T14:14:23.394Z"}
{"id":123456789, "time":1533046447394", time_ts":"2018-07-31T14:14:07.394Z"}
{"id":123456789, "time":1533046431394", time_ts":"2018-07-31T14:13:51.394Z"}
{"id":123456789, "time":1533046415394", time_ts":"2018-07-31T14:13:35.394Z"}
{"id":123456789, "time":1533046399394", time_ts":"2018-07-31T14:13:19.394Z"}
{"id":123456789, "time":1533046484394", time_ts":"2018-07-31T14:14:44.394Z"}
{"id":123456789, "time":1533046468394", time_ts":"2018-07-31T14:14:28.394Z"}
{"id":123456789, "time":1533046452394", time_ts":"2018-07-31T14:14:12.394Z"}
{"id":123456789, "time":1533046436394", time_ts":"2018-07-31T14:13:56.394Z"}
{"id":123456789, "time":1533046420394", time_ts":"2018-07-31T14:13:40.394Z"}
{"id":123456789, "time":1533046404394", time_ts":"2018-07-31T14:13:24.394Z"}
{"id":123456789, "time":1533046489394", time_ts":"2018-07-31T14:14:49.394Z"}
{"id":123456789, "time":1533046473394", time_ts":"2018-07-31T14:14:33.394Z"}
{"id":123456789, "time":1533046457394", time_ts":"2018-07-31T14:14:17.394Z"}
{"id":123456789, "time":1533046441394", time_ts":"2018-07-31T14:14:01.394Z"}
{"id":123456789, "time":1533046425394", time_ts":"2018-07-31T14:13:45.394Z"}
{"id":123456789, "time":1533046409394", time_ts":"2018-07-31T14:13:29.394Z"}
{"id":123456789, "time":1533046393394", time_ts":"2018-07-31T14:13:13.394Z"}
{"id":123456789, "time":1533046478394", time_ts":"2018-07-31T14:14:38.394Z"}
{"id":123456789, "time":1533046462394", time_ts":"2018-07-31T14:14:22.394Z"}
{"id":123456789, "time":1533046446394", time_ts":"2018-07-31T14:14:06.394Z"}
{"id":123456789, "time":1533046430394", time_ts":"2018-07-31T14:13:50.394Z"}
{"id":123456789, "time":1533046414394", time_ts":"2018-07-31T14:13:34.394Z"}
{"id":123456789, "time":1533046398394", time_ts":"2018-07-31T14:13:18.394Z"}
{"id":123456789, "time":1533046483394", time_ts":"2018-07-31T14:14:43.394Z"}
{"id":123456789, "time":1533046467394", time_ts":"2018-07-31T14:14:27.394Z"}
{"id":123456789, "time":1533046451394", time_ts":"2018-07-31T14:14:11.394Z"}
{"id":123456789, "time":1533046435394", time_ts":"2018-07-31T14:13:55.394Z"}
{"id":123456789, "time":1533046419394", time_ts":"2018-07-31T14:13:39.394Z"}
{"id":123456789, "time":1533046403394", time_ts":"2018-07-31T14:13:23.394Z"}
{"id":123456789, "time":1533046456394", time_ts":"2018-07-31T14:14:16.394Z"}
{"id":123456789, "time":1533046440394", time_ts":"2018-07-31T14:14:00.394Z"}
{"id":123456789, "time":1533046424394", time_ts":"2018-07-31T14:13:44.394Z"}
{"id":123456789, "time":1533046408394", time_ts":"2018-07-31T14:13:28.394Z"}
{"id":123456789, "time":1533046392394", time_ts":"2018-07-31T14:13:12.394Z"}
{"id":123456789, "time":1533046488394", time_ts":"2018-07-31T14:14:48.394Z"}
{"id":123456789, "time":1533046472394", time_ts":"2018-07-31T14:14:32.394Z"}
{"id":123456789, "time":1533046477394", time_ts":"2018-07-31T14:14:37.394Z"}
{"id":123456789, "time":1533046461394", time_ts":"2018-07-31T14:14:21.394Z"}
{"id":123456789, "time":1533046445394", time_ts":"2018-07-31T14:14:05.394Z"}
{"id":123456789, "time":1533046429394", time_ts":"2018-07-31T14:13:49.394Z"}
{"id":123456789, "time":1533046413394", time_ts":"2018-07-31T14:13:33.394Z"}
{"id":123456789, "time":1533046397394", time_ts":"2018-07-31T14:13:17.394Z"}
{"id":123456789, "time":1533046482394", time_ts":"2018-07-31T14:14:42.394Z"}
{"id":123456789, "time":1533046466394", time_ts":"2018-07-31T14:14:26.394Z"}
{"id":123456789, "time":1533046450394", time_ts":"2018-07-31T14:14:10.394Z"}
{"id":123456789, "time":1533046434394", time_ts":"2018-07-31T14:13:54.394Z"}
{"id":123456789, "time":1533046418394", time_ts":"2018-07-31T14:13:38.394Z"}
{"id":123456789, "time":1533046402394", time_ts":"2018-07-31T14:13:22.394Z"}
{"id":123456789, "time":1533046407394", time_ts":"2018-07-31T14:13:27.394Z"}
{"id":123456789, "time":1533046487394", time_ts":"2018-07-31T14:14:47.394Z"}
{"id":123456789, "time":1533046471394", time_ts":"2018-07-31T14:14:31.394Z"}
{"id":123456789, "time":1533046455394", time_ts":"2018-07-31T14:14:15.394Z"}
{"id":123456789, "time":1533046439394", time_ts":"2018-07-31T14:13:59.394Z"}
{"id":123456789, "time":1533046423394", time_ts":"2018-07-31T14:13:43.394Z"}
{"id":123456789, "time":1533046476394", time_ts":"2018-07-31T14:14:36.394Z"}
{"id":123456789, "time":1533046460394", time_ts":"2018-07-31T14:14:20.394Z"}
{"id":123456789, "time":1533046444394", time_ts":"2018-07-31T14:14:04.394Z"}
{"id":123456789, "time":1533046428394", time_ts":"2018-07-31T14:13:48.394Z"}
{"id":123456789, "time":1533046412394", time_ts":"2018-07-31T14:13:32.394Z"}
{"id":123456789, "time":1533046396394", time_ts":"2018-07-31T14:13:16.394Z"}
{"id":123456789, "time":1533046481394", time_ts":"2018-07-31T14:14:41.394Z"}
{"id":123456789, "time":1533046465394", time_ts":"2018-07-31T14:14:25.394Z"}
{"id":123456789, "time":1533046449394", time_ts":"2018-07-31T14:14:09.394Z"}
{"id":123456789, "time":1533046433394", time_ts":"2018-07-31T14:13:53.394Z"}
{"id":123456789, "time":1533046417394", time_ts":"2018-07-31T14:13:37.394Z"}
{"id":123456789, "time":1533046401394", time_ts":"2018-07-31T14:13:21.394Z"}
{"id":123456789, "time":1533046486394", time_ts":"2018-07-31T14:14:46.394Z"}
{"id":123456789, "time":1533046470394", time_ts":"2018-07-31T14:14:30.394Z"}
{"id":123456789, "time":1533046454394", time_ts":"2018-07-31T14:14:14.394Z"}
{"id":123456789, "time":1533046438394", time_ts":"2018-07-31T14:13:58.394Z"}
{"id":123456789, "time":1533046422394", time_ts":"2018-07-31T14:13:42.394Z"}
{"id":123456789, "time":1533046406394", time_ts":"2018-07-31T14:13:26.394Z"}

So, the question is: How does SortValue work so that we can threat each 
message in the order given by the secondary key?


Mime
View raw message