flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "suganya (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (FLINK-8413) Snapshot state of aggregated data is not maintained in flink's checkpointing
Date Fri, 12 Jan 2018 05:10:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16323546#comment-16323546
] 

suganya edited comment on FLINK-8413 at 1/12/18 5:09 AM:
---------------------------------------------------------

The following code is using apache beam libraries to create a pipeline.Please find the code.

 public void run(String[] args) {
      BeamCLIOptions beamCliOptions = PipelineOptionsFactory.fromArgs(args).withValidation()
          .as(BeamCLIOptions.class);
      Pipeline pipeline = Pipeline.create(beamCliOptions);
      MergeDistribution mergeDistribution = MergeDistribution
          .valueOf(beamCliOptions.getMergeDistribution());
      MergeDistribution fixedWindowDuration = MergeDistribution
          .valueOf(beamCliOptions.getFixedWindowSize());
      KafkaIO.Read<String, String> kafkaEntityStreamReader = KafkaIO.<String, String>read()
          .withBootstrapServers(beamCliOptions.getKafkaServers())
          .withTopic(beamCliOptions.getKafkaTopic())
          .withKeyDeserializer(StringDeserializer.class)
          .withValueDeserializer(StringDeserializer.class)
          .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "latest","enable.auto.commit","true"));

      pipeline.apply(kafkaEntityStreamReader.withoutMetadata())
          .apply(Values.create())
          .apply(Window.<String>into(
              FixedWindows.of(Duration.standardMinutes(fixedWindowDuration.getMins())))
              .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
                  .plusDelayOf(Duration.standardMinutes(mergeDistribution.getMins()))))
              .discardingFiredPanes()
              .withAllowedLateness(Duration.ZERO))
          .apply(ParDo.of(new ExtractDataFn(
              beamCliOptions.getDatePartitionKey(),
              new DateTime().minusDays(beamCliOptions.getDaysAgo()).getMillis())))
          .apply("Applying GroupByKey on YYYY-MM-DD HH ", GroupByKey.create())
          .apply("Applying Merge ", ParDo.of(new MergeDataFn(beamCliOptions)));

      pipeline.run();
    }


was (Author: suganyap):
The following code is using apache beam libraries to create a pipeline.Please find the code.

 public void run(String[] args) {
      BeamCLIOptions beamCliOptions = PipelineOptionsFactory.fromArgs(args).withValidation()
          .as(BeamCLIOptions.class);
      Pipeline pipeline = Pipeline.create(beamCliOptions);
      MergeDistribution mergeDistribution = MergeDistribution
          .valueOf(beamCliOptions.getMergeDistribution());
      MergeDistribution fixedWindowDuration = MergeDistribution
          .valueOf(beamCliOptions.getFixedWindowSize());
      KafkaIO.Read<String, String> kafkaEntityStreamReader = KafkaIO.<String, String>read()
          .withBootstrapServers(beamCliOptions.getKafkaServers())
          .withTopic(beamCliOptions.getKafkaTopic())
          .withKeyDeserializer(StringDeserializer.class)
          .withValueDeserializer(StringDeserializer.class)
          .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "latest","enable.auto.commit","true"));

      pipeline.apply(kafkaEntityStreamReader.withoutMetadata())
          .apply(Values.create())
          .apply(Window.<String>into(
              FixedWindows.of(Duration.standardMinutes(fixedWindowDuration.getMins())))
              .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
                  .plusDelayOf(Duration.standardMinutes(mergeDistribution.getMins()))))
              .discardingFiredPanes()
              .withAllowedLateness(Duration.ZERO))
          .apply(ParDo.of(new ExtractDataFn(
              beamCliOptions.getDatePartitionKey(),
              new DateTime().minusDays(beamCliOptions.getDaysAgo()).getMillis())))
          .apply("Applying GroupByKey on YYYY-MM-DD HH ", GroupByKey.create())
          .apply("Applying Merge ", ParDo.of(new MergeDataFn(beamCliOptions)));

      pipeline.run();
    }

> Snapshot state of aggregated data is not maintained in flink's checkpointing
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-8413
>                 URL: https://issues.apache.org/jira/browse/FLINK-8413
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.3.2
>            Reporter: suganya
>
> We have a project which consumes events from kafka,does a groupby in a time window(5
mins),after window elapses it pushes the events to downstream for merge.This project is deployed
using flink ,we have enabled checkpointing to recover from failed state.
> (windowsize: 5mins , checkpointingInterval: 5mins,state.backend: filesystem)
> Offsets from kafka  get checkpointed every 5 mins(checkpointingInterval).Before finishing
the entire DAG(groupBy and merge) , events offsets are getting checkpointed.So incase of any
restart from task-manager ,new task gets started from last successful checkpoint ,but we could'nt
able to get the aggregated snapshot data(data from groupBy task) from the persisted checkpoint.
> Able to retrieve the last successful checkpointed offset from kafka ,but couldnt able
to get last aggregated data till checkpointing.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message