flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Biplob Biswas <revolutioni...@gmail.com>
Subject Re: Extracting Timestamp in MapFunction
Date Wed, 01 Jun 2016 14:33:09 GMT
Hi,

Before giving the method u described above a try, i tried adding the
timestamp with my data directly at the stream source.

Following is my stream source:

http://pastebin.com/AsXiStMC

and I am using the stream source as follows:

DataStream<Point> tuples = env.addSource(new DataStreamGenerator(filePath,
streamSpeed));
		ConnectedIterativeStreams<Point, MicroCluster[]> inputsAndMicroCluster =
tuples.iterate()
														.withFeedbackType(MicroCluster[].class);
		//mcStream.broadcast().global();
		DataStream<MicroCluster[]> updatedMicroCluster = inputsAndMicroCluster
														.flatMap(new MyCoFlatmap(k,tw))
														.keyBy(1)
														.reduce(new ReduceMC(k))
														.map(new ReturnMC());
		
		inputsAndMicroCluster.closeWith(updatedMicroCluster.broadcast());

The problem is, when i execute this, all the 4 different partition gets the
same data, I don't really understand how is the same data sent to all the 4
partitions when it should 4 different data tuple to 4 different partitions.

Can you maybe explain this behaviour? 



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Extracting-Timestamp-in-MapFunction-tp7240p7315.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Mime
View raw message