flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lasse Nedergaard <lassenederga...@gmail.com>
Subject ConnectedIterativeStreams and processing state 1.4.2
Date Tue, 01 May 2018 08:20:04 GMT
Hi.

I have a case where I have a input stream that I want to enrich with
external data. I want to cache some of the external lookup data to improve
the overall performances.
To update my cache (a CoProcessFunction) I would use iteration to send the
external enriched information back to the cache and update a mapstate. I
use CoProcesFunction as the input stream and the enrich stream contains 2
diff.object types and I don't want to mix them.
Because I use a ConnectedIterativeStream I can't use state in my
CoProcessFunction because the ConnectedIterativeStream create a DataStream
based on the Feedback signature and not the stream I close the iteration
with and it is not possible to provide a keySelector in the withFeedbackType

Form Flink source

public ConnectedIterativeStreams(DataStream<I> input,
TypeInformation<F> feedbackType, long waitTime) {
    super(input.getExecutionEnvironment(), input, new
DataStream(input.getExecutionEnvironment(), new
CoFeedbackTransformation(input.getParallelism(), feedbackType,
waitTime)));
}

and both streams need to be keyed before state are assigned to the operator.

Any ideas how to workaround this problem?

My sudo code is as below.

IterativeStream.ConnectedIterativeStreams<InputObject, EnrichData>
iteration = inputStream
        .keyBy(obj -> obj.getkey))
        .iterate(maxWaitForIterations).withFeedbackType(TypeInformation.of(new
TypeHint<EnrichData>() {}));

DataStream<ReportMessageBase> enrichedStream = iteration
        .process(new EnrichFromState());

DataStream<ReportMessageBase> notEnrichedOutput = enrichedStream
        .filter(obj -> obj.enriched);

EnrichService EnrichService = new EnrichService();
DataStream<InputObject> enrichedFromApi =
EnrichService.parse(notEnrichedOutput);

DataStream<EnrichData> newEnrich = enrichedFromApi
        .map(obj -> {

            EnrichData newData =  new EnrichData();
            newData.xx = obj.xx();

            return newData;
        })
        .keyBy(obj -> obj.*getkey*);


iteration.closeWith(newAddresses);

....

Mime
View raw message