flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nico <nico.franz...@gmail.com>
Subject Re: Events are assigned to wrong window
Date Mon, 16 Jan 2017 11:18:27 GMT
Hi Aljoscha,

is was able to identify the root cause of the problem. It is my first map
function using the ValueState. But first, the assignTimestampsAndWatermarks()
is called after the connector to Kafka is generated:

FlinkKafkaConsumer09<Car> carFlinkKafkaConsumer09  =
      new FlinkKafkaConsumer09<>("Traffic", new Car(), properties);

// Extrahieren der Timestamps mit max. Delay von 2s
carFlinkKafkaConsumer09.assignTimestampsAndWatermarks(new
TimestampGenerator(Time.seconds(0)));

In the map function I try to calculate the direction between two GPS
data points. For this, I store the last event in ValueState. The
function looks like this:

private static class BearingMap extends RichMapFunction<Car, Car> {

   private transient ValueState<Car> state;
   private final double maxdiff = 12; // in Sekunden

   @Override
   public Car map(Car destination) throws Exception {

      Car origin = state.value();
      double olat, olon, dlat, dlon;

      /**
       *  Wenn State leer, berechne keine Richtung, sondern speichere
Event nur in State
       */
      if (origin == null){
         state.update(destination);
         // gebe Car ohne Aenderungen zurueck
         return destination;
      }

      double diff = origin.getTimestamp()-destination.getTimestamp();

           System.out.println("Differenz: " +diff);

           if(Math.abs(diff) <= maxdiff*1000){

         /*
          * Bei spaeten Events, die jedoch noch in den erlaubten Delay fallen
          */
         if(diff > 0){
            Car tmp = destination;
            destination = origin;
            origin = tmp;
         }

         /*
          * Car tmp ist immer der Ursprung
          */

         double bearing = Helper.calculateBearing(
               origin.getLat(),origin.getLon(),destination.getLat(),destination.getLon());

         // Update des State's
         state.update(destination);

         origin.setDirection(bearing);
         return origin;

      }

      // Bei zu spaeten Events behalte jetzigen Status und gebe diesen
ohne Richtung zurück
         return origin;

   }


   @Override
   public void open(Configuration parameters) throws Exception {

      ValueStateDescriptor<Car> vsd = new ValueStateDescriptor<>(
            "lastEvent",
            Car.class,
            null
      );

      state = getRuntimeContext().getState(vsd);
   }

}

Together with the window function:


private static class TimeWindowTest implements WindowFunction<Car,
Tuple9<Double, Double, Double, Double, Double, Double, Double,
Integer, List<String>>, Tuple, TimeWindow> {
    @Override
    public void apply(Tuple tuple, TimeWindow timeWindow,
Iterable<Car> iterable, Collector<Tuple9<Double, Double, Double,
Double, Double, Double, Double, Integer, List<String>>> collector)
throws Exception {
        String s = "Zeitfenster: " +timeWindow.getStart() +" - " +
timeWindow.getEnd() +"\n";
        Set<Long> timestamps = new HashSet<Long>();

        for( Car c : iterable){
            timestamps.add(c.getTimestamp());
        }

        System.out.println( s +timestamps +"\n\n");
    }
}

I get for :

stream
   .filter(new FilterFunction<Car>() {
      @Override
      public boolean filter(Car car) throws Exception {
         return car.getId().equals("car.330");
      }
   })
             .keyBy("id")
             .map(new BearingMap())
             .keyBy("id")
             .window(TumblingEventTimeWindows.of(Time.seconds(10)))
             .apply(new TimeWindowTest());

So actually, when an event e1 arrives the Map Operator, it is stored
in ValueState and after the next element e2 arrives, e1
will be forwarded. This is after 5 seconds. This generates the
following outcome. One Element is always around 5 seconds before the
start of the window.

Differenz: -5013.0
Differenz: -5014.0
Zeitfenster: 1484564690000 - 1484564700000 (Window times start and end)
[1484564686236, 1484564691260]


Differenz: -5009.0
Differenz: -5007.0
Zeitfenster: 1484564700000 - 1484564710000
[1484564696273, 1484564701287]


Differenz: -5005.0
Differenz: -5014.0
Zeitfenster: 1484564710000 - 1484564720000
[1484564706296, 1484564711303]


Best,

Nico



2017-01-09 16:10 GMT+01:00 Aljoscha Krettek <aljoscha@apache.org>:

> Hi,
> I'm assuming you also have the call to assignTimestampsAndWatermarks()
> somewhere in there as well, as in:
>
> stream
>       .assignTimestampsAndWatermarks(new TimestampGenerator()) // or
> somewhere else in the pipeline
>       .keyBy("id")
>       .map(...)
>       .filter(...)
>       .map(...)
>       .keyBy("areaID")
>       .map(new KeyExtractor())
>       .keyBy("f1.areaID","f0.sinterval")
>       .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>       .apply(new TrafficInformation());
>
> Just checking, to make sure. If you have this we might have to dig a
> little deeper. Could you also please trying to bring the whole output of
> your apply() method in one go, i.e. collect all the output in a String and
> then have one call to System.out.println(), it could be that the output in
> the terminal is not completely in order.
>
> Cheers,
> Aljoscha
>
> On Mon, 2 Jan 2017 at 15:04 Nico <nico.franzeck@gmail.com> wrote:
>
>> Hi Aljoscha,
>>
>> thank you for having a look. Actually there is not too much code based on
>> timestamps:
>>
>> stream
>>       .keyBy("id")
>>       .map(...)
>>       .filter(...)
>>       .map(...)
>>       .keyBy("areaID")
>>       .map(new KeyExtractor())
>>       .keyBy("f1.areaID","f0.sinterval")
>>       .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>>       .apply(new TrafficInformation());
>>
>> The map functions only enrich the data and don't change anything related
>> to the timestamp.
>>
>> the apply function is:
>>
>> @Override
>> public void apply(
>> Tuple key,
>> TimeWindow timeWindow,
>> Iterable<Tuple2<DirectionInterval, Car>> cars,
>> Collector<Tuple3<String, Double, Double>> out) throws Exception {
>>
>> System.out.println("Start: " +timeWindow.getStart());
>> System.out.println("End: " +timeWindow.getEnd());
>>
>> for(Tuple2<DirectionInterval, Car> t : cars){
>> System.out.println(t.f1);
>> }
>>
>> System.out.println(t.f1) prints all information about a car, in which the
>> timestep is embedded. The System gets the timestamp with the class:
>>
>> public class TimestampGenerator extends BoundedOutOfOrdernessTimestampExtractor
>> <Car> {
>>
>>
>>     public TimestampGenerator(Time maxOutOfOrderness){
>>         super(maxOutOfOrderness);
>>     }
>>
>>     @Override
>>     public long extractTimestamp(Car car) {
>>         return car.getTimestamp();
>>     }
>>
>>
>> Example output is presented in the previous post... it looks like the
>> timestamp is rounded... I am confused :-/
>>
>> Best,
>> Nico
>>
>> 2016-12-23 19:41 GMT+01:00 Aljoscha Krettek <aljoscha@apache.org>:
>>
>> Hi,
>> could you please share code (and example data) for producing this output.
>> I'd like to have a look.
>>
>> Cheers,
>> Aljoscha
>>
>> On Wed, 21 Dec 2016 at 16:29 Nico <nico.franzeck@gmail.com> wrote:
>>
>> Hi @all,
>>
>> I am using a TumblingEventTimeWindows.of(Time.seconds(20)) for testing.
>> During this I found a strange behavior (at least for me) in the assignment
>> of events.
>>
>> The first element of a new window is actually always part of the old
>> window. I thought the events are late, but then they they would be dropped
>> instead of assigned to the new window. Even with a allowedLateness of 10s
>> the behavior remains the same.
>>
>> The used timeWindow.getStart() and getEnd in order to get the boundaries
>> of the window.
>>
>> Can someone explain this?
>>
>> Best,
>> Nico
>>
>>
>> TimeWindows with Elements:
>>
>> Start: 1482332940000 - End: 1482332960000
>> timestamp=1482332952907
>>
>> Start: 1482332960000 - End: 1482332980000
>> timestamp=1482332958929
>> timestamp=1482332963995
>> timestamp=1482332969027
>> timestamp=1482332974039
>>
>> Start: 1482332980000 - End: 1482333000000
>> timestamp=1482332979059
>> timestamp=1482332984072
>> timestamp=1482332989081
>> timestamp=1482332994089
>>
>> Start: 1482333000000 - End: 1482333020000
>> timestamp=1482332999113
>> timestamp=1482333004123
>> timestamp=1482333009132
>> timestamp=1482333014144
>>
>>
>>

Mime
View raw message