flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dawid Wysakowicz <dwysakow...@apache.org>
Subject Re: How to debug difference between Kinesis and Kafka
Date Thu, 21 Feb 2019 13:36:30 GMT
It is definitely a solution ;)

You should be aware of the downsides though:

  * you might get different results in case of reprocessing
  * you might drop some data as late, due to some delays in processing,
    if the events arrive later then the "ProcessingTime" threshold

Best,

Dawid

On 21/02/2019 14:18, Stephen Connolly wrote:
> Yes, it was the "watermarks for event time when no events for that
> shard" problem.
>
> I am now investigating whether we can use a blended watermark of
> max(lastEventTimestamp - 1min, System.currentTimeMillis() - 5min) to
> ensure idle shards do not cause excessive data retention.
>
> Is that the best solution?
>
> On Thu, 21 Feb 2019 at 08:30, Dawid Wysakowicz <dwysakowicz@apache.org
> <mailto:dwysakowicz@apache.org>> wrote:
>
>     Hi Stephen,
>
>     Watermark for a single operator is the minimum of Watermarks
>     received from all inputs, therefore if one of your
>     shards/operators does not have incoming data it will not produce
>     Watermarks thus the Watermark of WindowOperator will not progress.
>     So this is sort of an expected behavior.
>
>     I recommend reading the docs linked by Congxian, especially this
>     section[1].
>
>     Best,
>
>     Dawid
>
>     [1]
>     https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#watermarks-in-parallel-streams
>
>     On 19/02/2019 14:31, Stephen Connolly wrote:
>>     Hmmm my suspicions are now quite high. I created a file source
>>     that just replays the events straight then I get more results....
>>
>>     On Tue, 19 Feb 2019 at 11:50, Stephen Connolly
>>     <stephen.alan.connolly@gmail.com
>>     <mailto:stephen.alan.connolly@gmail.com>> wrote:
>>
>>         Hmmm after expanding the dataset such that there was
>>         additional data that ended up on shard-0 (everything in my
>>         original dataset was coincidentally landing on shard-1) I am
>>         now getting output... should I expect this kind of behaviour
>>         if no data arrives at shard-0 ever?
>>
>>         On Tue, 19 Feb 2019 at 11:14, Stephen Connolly
>>         <stephen.alan.connolly@gmail.com
>>         <mailto:stephen.alan.connolly@gmail.com>> wrote:
>>
>>             Hi, I’m having a strange situation and I would like to
>>             know where I should start trying to debug.
>>
>>             I have set up a configurable swap in source, with three
>>             implementations:
>>
>>             1. A mock implementation
>>             2. A Kafka consumer implementation
>>             3. A Kinesis consumer implementation
>>
>>             From injecting a log and no-op map function I can see
>>             that all three sources pass through the events correctly.
>>
>>             I then have a window based on event time stamps… and from
>>             inspecting the aggregation function I can see that the
>>             data is getting aggregated…, I’m using the
>>             `.aggregate(AggregateFunction.WindowFunction)` variant so
>>             that I can retrieve the key
>>
>>             Here’s the strange thing, I only change the source (and
>>             each source uses the same deserialization function) but:
>>
>>               * When I use either Kafka or my Mock source, the
>>                 WindowFunction gets called as events pass the end of
>>                 the window
>>               * When I use the Kinesis source, however, the window
>>                 function never gets called. I have even tried
>>                 injecting events into kinesis with really high
>>                 timestamps to flush the watermarks in my
>>                 BoundedOutOfOrdernessTimestampExtractor... but nothing
>>
>>             I cannot see how this source switching could result in
>>             such a different behaviour:
>>
>>                     Properties sourceProperties = new Properties();
>>                     ConsumerFactory sourceFactory;
>>                     String sourceName =
>>             configParams.getRequired("source");
>>                     switch (sourceName.toLowerCase(Locale.ENGLISH)) {
>>                         case "kinesis":
>>                             sourceFactory = FlinkKinesisConsumer::new;
>>                             copyOptionalArg(configParams,
>>             "aws-region", sourceProperties,
>>             AWSConfigConstants.AWS_REGION);
>>                             copyOptionalArg(configParams,
>>             "aws-endpoint", sourceProperties,
>>             AWSConfigConstants.AWS_ENDPOINT);
>>                             copyOptionalArg(configParams,
>>             "aws-access-key", sourceProperties,
>>             AWSConfigConstants.AWS_ACCESS_KEY_ID);
>>                             copyOptionalArg(configParams,
>>             "aws-secret-key", sourceProperties,
>>             AWSConfigConstants.AWS_SECRET_ACCESS_KEY);
>>                             copyOptionalArg(configParams,
>>             "aws-profile", sourceProperties,
>>             AWSConfigConstants.AWS_PROFILE_NAME);
>>                             break;
>>                         case "kafka":
>>                             sourceFactory = FlinkKafkaConsumer010::new;
>>                             copyRequiredArg(configParams,
>>             "bootstrap-server", sourceProperties, "bootstrap.servers");
>>                             copyOptionalArg(configParams, "group-id",
>>             sourceProperties, "group.id <http://group.id>");
>>                             break;
>>                         case "mock":
>>                             sourceFactory = MockSourceFunction::new;
>>                             break;
>>                         default:
>>                             throw new RuntimeException("Unknown
>>             source '" + sourceName + '\'');
>>                     }
>>
>>                     // set up the streaming execution environment
>>                     final StreamExecutionEnvironment env =
>>             StreamExecutionEnvironment.getExecutionEnvironment();
>>
>>                     // poll watermark every second because using
>>             BoundedOutOfOrdernessTimestampExtractor
>>                     env.getConfig().setAutoWatermarkInterval(1000L);
>>                     env.enableCheckpointing(5000);
>>
>>                     SplitStream<JsonNode> eventsByType =
>>             env.addSource(sourceFactory.create(
>>                             configParams.getRequired("topic"),
>>                             new ObjectNodeDeserializationSchema(),
>>                             sourceProperties
>>                     ))
>>                             .returns(ObjectNode.class) // the use of
>>             ConsumerFactory erases the type info so add it back
>>                             .name("raw-events")
>>                             .assignTimestampsAndWatermarks(
>>                                     new
>>             ObjectNodeBoundedOutOfOrdernessTimestampExtractor("timestamp",
>>             Time.seconds(5))
>>                             )
>>                             .split(new
>>             JsonNodeOutputSelector("eventType"));
>>             ...
>>                     eventsByType.select(...)
>>                             .keyBy(new JsonNodeStringKeySelector("_key"))
>>                            
>>             .window(TumblingEventOffsetPerKeyEventTimeWindows.of(Time.seconds(windowDuration),
>>                                     (KeySelector<JsonNode, Time>)
>>             TasksMain::offsetPerMaster))
>>                             .trigger(EventTimeTrigger.create())
>>                             .aggregate(new CountsAggregator<>(), new
>>             KeyTagger<>()) // <==== The CountsAggregator is seeing
>>             the data
>>                             .print() // <==== HERE is where we get no
>>             output from Kinesis... but Kafka and my Mock are just fine!
>>
>>

Mime
View raw message