flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lasse Nedergaard <lassenederga...@gmail.com>
Subject ConnectedIterativeStreams and processing state 1.4.2
Date Tue, 01 May 2018 08:20:04 GMT

I have a case where I have a input stream that I want to enrich with
external data. I want to cache some of the external lookup data to improve
the overall performances.
To update my cache (a CoProcessFunction) I would use iteration to send the
external enriched information back to the cache and update a mapstate. I
use CoProcesFunction as the input stream and the enrich stream contains 2
diff.object types and I don't want to mix them.
Because I use a ConnectedIterativeStream I can't use state in my
CoProcessFunction because the ConnectedIterativeStream create a DataStream
based on the Feedback signature and not the stream I close the iteration
with and it is not possible to provide a keySelector in the withFeedbackType

Form Flink source

public ConnectedIterativeStreams(DataStream<I> input,
TypeInformation<F> feedbackType, long waitTime) {
    super(input.getExecutionEnvironment(), input, new
DataStream(input.getExecutionEnvironment(), new
CoFeedbackTransformation(input.getParallelism(), feedbackType,

and both streams need to be keyed before state are assigned to the operator.

Any ideas how to workaround this problem?

My sudo code is as below.

IterativeStream.ConnectedIterativeStreams<InputObject, EnrichData>
iteration = inputStream
        .keyBy(obj -> obj.getkey))
TypeHint<EnrichData>() {}));

DataStream<ReportMessageBase> enrichedStream = iteration
        .process(new EnrichFromState());

DataStream<ReportMessageBase> notEnrichedOutput = enrichedStream
        .filter(obj -> obj.enriched);

EnrichService EnrichService = new EnrichService();
DataStream<InputObject> enrichedFromApi =

DataStream<EnrichData> newEnrich = enrichedFromApi
        .map(obj -> {

            EnrichData newData =  new EnrichData();
            newData.xx = obj.xx();

            return newData;
        .keyBy(obj -> obj.*getkey*);



View raw message