flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eron Wright <ewri...@live.com>
Subject RE: Surprising order of events in union of two streams
Date Mon, 18 Apr 2016 19:25:35 GMT
Not entirely related, but for the special case of writing a parallelized source that emits
records in event time order, I found the MergeIterator to be most useful.  Here's an example:https://github.com/nupic-community/flink-htm/blob/eb29f97f08f3482b32228db7284f669aad8dce2e/flink-htm-streaming-scala/src/main/scala/org/numenta/nupic/flink/streaming/connectors/river/RiverSource.scala#L157
-Eron Wright

> From: aljoscha@apache.org
> Date: Mon, 18 Apr 2016 08:20:11 +0000
> Subject: Re: Surprising order of events in union of two streams
> To: dev@flink.apache.org
> CC: namluc.tran@euranova.eu
> 
> Hi,
> yes, I'm afraid you need a custom operator for that. (We are working on
> providing built-in support for this, though)
> 
> I sketched an Operator that does the sorting and also wrote a quick example
> that uses it:
> SortedWindowOperator:
> https://gist.github.com/aljoscha/6600bc1121b7f8a0f68b89988dd341bd
> SortedWindowExample:
> https://gist.github.com/aljoscha/959fc61aff2cd774fe60da711f5ae40b
> 
> It puts incoming elements into buckets based on the timestamp and sorts
> them once the watermark passes the end of a bucket. Take a look at
> processWatermark(), there you would fill in your custom logic.
> 
> Cheers,
> Aljoscha
> 
> On Fri, 15 Apr 2016 at 14:41 Gary Verhaegen <gary.verhaegen@euranova.eu>
> wrote:
> 
> > Hi Aljoscha,
> >
> > What I'm looking for is an operator that joins two streams together, but
> > keeps the events in timestamp order.
> >
> > What I was trying to do with the window specification comes down to: for
> > each event on that stream, I want to call this function with this event and
> > all of the events that arrived within 5 minutes after it. So conceptually a
> > sliding window, except that the width is defined in terms of time and the
> > step in terms of count.
> >
> > What I will really need to do is probably manage the window myself in
> > operator state (because in many cases I expect that I will not need the
> > whole window, so it may be interesting to be able to evaluate that
> > eagerly), but I think I really need the events to arrive in order.
> >
> > On 14 April 2016 at 17:04, Aljoscha Krettek <aljoscha@apache.org> wrote:
> >
> > > Hi,
> > > Flink does not make any guarantees about the order of arriving elements
> > > except in the case of one-to-one forwarding patterns. That is, only for
> > > map/flatMap/filter and such operations will the order in which two
> > > successive operations see their elements be the same.
> > >
> > > Could you please describe in prose form what the expected outcome of your
> > > windowing specification is. We could start from this and try to figure
> > out
> > > how to make Flink behave as it should.
> > >
> > > Cheers,
> > > Aljoscha
> > >
> > > On Thu, 14 Apr 2016 at 16:32 Gary Verhaegen <gary.verhaegen@euranova.eu>
> > > wrote:
> > >
> > > > Hi list,
> > > >
> > > > I am surprised by the behaviour of the code below. In particular, I am
> > > > puzzled by the fact that events do not seem to enter the window in
> > order.
> > > > What am I doing wrong?
> > > >
> > > > Here's what I don't understand. This test outputs the following error:
> > > >
> > > > java.lang.AssertionError: expected:<[[10 "Join(Left,Right)" {"Left:@t
> > ":
> > > > 10,
> > > > "Left:attr": 1, "Right:@t": 9, "Right:val": 1}], [15
> > "Join(Left,Right)"
> > > > {"Left:@t": 14, "Left:attr": 1, "Right:@t": 15, "Right:val": 1}]]>
but
> > > > was:<[[9 "None" {"times": [9]}], [12 "None" {"times": [12]}], [17
> > "None"
> > > > {"times": [17]}], [9 "None" {"times": [9, 15]}], [9 "None" {"times":
> > [9,
> > > > 15, 10]}], [9 "None" {"times": [9, 15, 10, 14]}]]>
> > > >
> > > > Now, the test is not complete, so it's not surprising that it fails,
> > but
> > > > what really puzzles me is that there appears to be a moment when my
> > > window
> > > > contains an event at time 9 and an event at time 15, but does not yet
> > > > include the events at times 10 and 14, which should be part of the same
> > > > stream (and are indeed added later).
> > > >
> > > > This code uses the 1.0.1 version of flink-java,
> > flink-streaming-java_2.11
> > > > and flink-clients_2.11 (and junit 4.12), running under Java 8 with the
> > > > relevant parts of the pom.xml uncommented.
> > > >
> > > > package enx.cep;
> > > > import static org.junit.Assert.assertEquals;
> > > > import org.apache.flink.streaming.api.TimeCharacteristic;import
> > > > org.apache.flink.streaming.api.datastream.DataStream;import
> > > >
> > > >
> > >
> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import
> > > >
> > >
> > org.apache.flink.streaming.api.functions.AscendingTimestampExtractor;import
> > > >
> > org.apache.flink.streaming.api.functions.windowing.WindowFunction;import
> > > > org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;import
> > > > org.apache.flink.streaming.api.windowing.triggers.CountTrigger;import
> > > > org.apache.flink.streaming.api.windowing.windows.GlobalWindow;import
> > > > org.apache.flink.util.Collector;import org.junit.Test;
> > > > import java.io.*;import java.net.ServerSocket;import
> > > > java.net.Socket;import java.net.SocketTimeoutException;import
> > > > java.util.*;
> > > > import java.util.function.Function;import
> > > > java.util.stream.Collectors;import java.util.stream.StreamSupport;
> > > > public class AlgebraTest {
> > > >     @Test public void flinkCanJoinTwoStreams() throws Exception {
> > > >         final List<Msg> inputs = list(
> > > >                 Msg(9, "Right", "val", 1),
> > > >                 Msg(10, "Left", "attr", 1),
> > > >                 Msg(12, "Left", "attr", 2),
> > > >                 Msg(14, "Left", "attr", 1),
> > > >                 Msg(15, "Right", "val", 1),
> > > >                 Msg(17, "Right", "val", 3));
> > > >         final List<Msg> expected = list(
> > > >                 Msg(10, "Join(Left,Right)", "Right:val", 1,
> > "Left:attr",
> > > 1,
> > > >                         "Right:@t", 9, "Left:@t", 10),
> > > >                 Msg(15, "Join(Left,Right)", "Right:val", 1,
> > "Left:attr",
> > > 1,
> > > >                         "Right:@t", 15, "Left:@t", 14));
> > > >         final List<Msg> output = runStreamAlg(inputs, source ->
{
> > > >             final DataStream<Msg> RightSource = source.filter(msg
->
> > > > "Right".equals(msg.type));
> > > >             final DataStream<Msg> LeftSource = source.filter(msg
->
> > > > "Left".equals(msg.type));
> > > >
> > > >             // Join Left & Right streams on
> > > >             // Left.attr == Right.val && abs(Left.t - Right.t)
< 5
> > > >             final DataStream<Msg> joined =
> > LeftSource.union(RightSource)
> > > >                     .keyBy(msg -> {
> > > >                         if ("Right".equals(msg.type)) {
> > > >                             return msg.attrs.get("val");
> > > >                         } else if ("Left".equals(msg.type)) {
> > > >                             return msg.attrs.get("attr");
> > > >                         } else {
> > > >                             throw new RuntimeException();
> > > >                         }
> > > >                     })
> > > >                     .window(GlobalWindows.create())
> > > >                     .trigger(CountTrigger.of(1))
> > > >                     .apply(new WindowFunction<Msg, Msg, Object,
> > > > GlobalWindow>() {
> > > >                         @Override
> > > >                         public void apply(Object _key, GlobalWindow
> > > > _w, Iterable<Msg> ins, Collector<Msg> collector) throws Exception
{
> > > >                             List<Integer> times =
> > > > StreamSupport.stream(ins.spliterator(), false)
> > > >                                     .map(m -> m.timestamp)
> > > >                                     .collect(Collectors.toList());
> > > >
> > > > collector.collect(Msg(times.stream().mapToInt(i ->
> > > > i).min().getAsInt(),
> > > >                                     "None", "times", times));
> > > >                         }
> > > >                     });
> > > >
> > > >             return joined;
> > > >         });
> > > >         assertEquals(expected, output);
> > > >     }
> > > >
> > > >     private final List<Msg> runStreamAlg(List<Msg> input,
> > > > Function<DataStream<Msg>, DataStream<Msg>> fn) {
> > > >         final StreamExecutionEnvironment env =
> > > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > >         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> > > >         final DataStream<Msg> source = env.fromCollection(input)
> > > >                 .assignTimestampsAndWatermarks(new
> > > > AscendingTimestampExtractor<Msg>() {
> > > >                     @Override
> > > >                     public long extractAscendingTimestamp(Msg msg) {
> > > >                         return msg.timestamp * 1000;
> > > >                     }
> > > >                 });
> > > >         final DataStream<Msg> transformed = fn.apply(source);
> > > >
> > > >         final List<Msg> res = new ArrayList<>();
> > > >         try (final ServerSocket server = new ServerSocket(0)) {
> > > >             final int serverPort = server.getLocalPort();
> > > >
> > > >             transformed.addSink(m -> {
> > > >                 try (final Socket client = new Socket("localhost",
> > > > serverPort)) {
> > > >                     final ObjectOutputStream toServer = new
> > > > ObjectOutputStream(client.getOutputStream());
> > > >                     toServer.writeObject(m);
> > > >                     toServer.flush();
> > > >                     toServer.close();
> > > >                 }
> > > >             });
> > > >
> > > >             final Thread t = new Thread(() -> {
> > > >                 while (true) {
> > > >                     try (final ObjectInputStream in = new
> > > > ObjectInputStream(server.accept().getInputStream())) {
> > > >                         res.add((Msg) in.readObject());
> > > >                         server.setSoTimeout(500);
> > > >                     } catch (SocketTimeoutException e) {
> > > >                         return;
> > > >                     } catch (java.io.IOException |
> > ClassNotFoundException
> > > > e) {
> > > >                         throw new RuntimeException(e);
> > > >                     }
> > > >                 }
> > > >             });
> > > >             t.start();
> > > >             try {
> > > >                 env.execute();
> > > >             } catch (Exception e) {
> > > >                 e.printStackTrace();
> > > >             }
> > > >             t.join();
> > > >         } catch (Exception e) {
> > > >             throw new RuntimeException(e);
> > > >         }
> > > >         return res;
> > > >     }
> > > >
> > > >     private static <T> List<T> list(T elem, T... others) {
> > > >         final List<T> res = new ArrayList<>();
> > > >         res.add(elem);
> > > >         for(T t: others) {
> > > >             res.add(t);
> > > >         }
> > > >         return res;
> > > >     }
> > > >
> > > >     private static Msg Msg(int timestamp, String type, Object...
> > attrs) {
> > > >         return new Msg(timestamp, type, attrs);
> > > >     }
> > > >
> > > >     private static class Msg implements Serializable {
> > > >         private final String type;
> > > >         private final int timestamp;
> > > >         private final Map<String, Object> attrs;
> > > >         public Msg(int timestamp, String type, Object... attrs) {
> > > >             this.timestamp = timestamp;
> > > >             this.type = type;
> > > >             this.attrs = new HashMap<>();
> > > >             if (attrs.length % 2 != 0) throw new
> > > > IllegalArgumentException();
> > > >             for (int i = 0; i < attrs.length; i += 2) {
> > > >                 if (!(attrs[i] instanceof String)) throw new
> > > > IllegalArgumentException();
> > > >                 this.attrs.put((String) attrs[i], attrs[i+1]);
> > > >             }
> > > >         }
> > > >
> > > >         public String toString() {
> > > >             return String.format("[%d \"%s\" {%s}]",
> > > >                     this.timestamp,
> > > >                     this.type,
> > > >                     this.attrs.entrySet().stream()
> > > >                             .sorted((e1, e2) ->
> > > > e1.getKey().compareTo(e2.getKey()))
> > > >                             .map(e -> String.format("\"%s\": %s",
> > > > e.getKey(), e.getValue()))
> > > >                             .reduce((acc, el) -> acc + ", " + el)
> > > >                             .orElseGet(() -> ""));
> > > >         }
> > > >     }}
> > > >
> > >
> >
 		 	   		  
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message