flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From AndreaKinn <kinn6...@hotmail.it>
Subject How to fill flink's datastream
Date Sat, 02 Sep 2017 17:23:21 GMT
Excuse me for the unclear title but I don't know how to summarise the
I'm using an external library integrated with Flink called Flink-HTM. It is
still a prototype.
Internally, it performs everything I want but I have a problem returning
evaluated values in a  printable datastream.
I posted here my question because I believe the problem is tied with Flink
and not with the library.

Essentially I have the following code in my main:

*/DataStream<Double> result = HTM.learn(kafkaStream, new
				.select(new InferenceSelectFunction<Harness.KafkaRecord, Double>() {
                    public Double select(Tuple2<Harness.KafkaRecord,
NetworkInference> inference) throws Exception {
						return inference.f1.getAnomalyScore();

Then I want to print the datastream "result".
Following the /learn/ method the flink-htm lib correctly performs many
operations on data. 
At the end of this computation, in another class I have a /DataStream<T,
NetworkInference>/ and essentially I have to call the overridden "/select/"
method on that/ Datastream<T,NetworkInference>/.

The code which would do that is:

*/final DataStream<Tuple2&lt;T, NetworkInference>> inferenceStream =
           return inferenceStream
                .map(new InferenceSelectMapper<T,
                .returns(returnType);    / 
where /map/ and /returns/ methods are described in Flink's

*/public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper)

		TypeInformation<R> outType =
TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
				Utils.getCallLocationName(), true);

		return transform("Map", outType, new StreamMap<>(clean(mapper)));

*/public SingleOutputStreamOperator<T> returns(TypeInformation<T> typeInfo)
		requireNonNull(typeInfo, "TypeInformation must not be null");
		return this;

while /InferenceSelectMapper<T,R>/ is the following class:

*/private static class InferenceSelectMapper<T, R> implements
MapFunction<Tuple2&lt;T, NetworkInference>, R> {

	private final InferenceSelectFunction<T, R> inferenceSelectFunction;

        public InferenceSelectMapper(InferenceSelectFunction<T, R>
inferenceSelectFunction) {
        		this.inferenceSelectFunction = inferenceSelectFunction;

        public R map(Tuple2<T, NetworkInference> value) throws Exception {
        		return inferenceSelectFunction.select(value);

which implements Flink's /MapFunction/. I absolutely need the program call
the /InferenceSelectMapper.map()/ method to call my defined "/select/"
function, unfortunately this doesn't happen. As consequence of that, in main
method and in the IDE console, I suppose the /DataStream result/ is not
filled and none output is printed, which is the my fundamental problem.

Since I'm not a Flink expert I don't know how to perform many operations at
"lower level".
Honestly I don't understand exactly what /map/ and /returns/ methods of
/DataStream.class/ do. I thought a lot about it and I also tried to find a
way to call /InferenceSelectMapper.map()/ method but I don't know how to
extract the /Tuple2<T, NetworkInference>/ from the

I'm absolutely sure that the /map/ function I need in
/InferenceSelectMethod/ is not called because it doesn't appear in call
hierarchy and also adding a print instruction that is not showed. 

Please, can you help me to solve this? I've been stuck on it for a week
while the lib's owner doesn't reply to my mails. 
Sorry for the length.

Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

View raw message