flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Biplob Biswas <revolutioni...@gmail.com>
Subject Re: Regarding Broadcast of datasets in streaming context
Date Wed, 11 May 2016 08:00:06 GMT
Hi Gyula,

I tried doing something like the following in the 2 flatmaps, but i am not
getting desired results and still confused how the concept you put forward
would work:

public static final class MyCoFlatmap implements CoFlatMapFunction<Point,
Centroid, Centroid>{
		
		Centroid[] centroids;

		
		@Override
		public void flatMap1(Point in, Collector<Centroid> out) throws Exception {

			if(flag)
			{
				Centroids = new Centroid[numofMC];
				flag = false;
			}
			if(id < numofMC)
			{   
				System.out.println(id);
				Centroid generatedMC =
CentroidCreator.generateCentroid(id,timestamp,in);
				Centroids[id] = generatedMC;
				out.collect(generatedMC);
				id++;
			}
			else
			{
				Centroid closestMC = null;
				double minDistance = Double.MAX_VALUE;
				for(Centroid mc : Centroids) 
				{
			      double distance = distance(in.pt, mc.getCenter());
			      if (distance < minDistance) {
			        closestMC = mc;
			        minDistance = distance;
			      }
			    }
				double radius = getRadius(closestMC, Centroids);
				if (minDistance < radius) 
				{
					closestMC.insert(in.pt, timestamp);
				}
				out.collect(closestMC);
			}
		}

		@Override
		public void flatMap2(Centroid in, Collector<Centroid> out) throws
Exception {
			Centroids[in.id] = in;
			System.out.println("MC: "+in.toString());
		}
		
	}

as mentioned in my previous reply,  i understand that each of the map
function in the co-flat map would receive one tuple each at a time .. so
that would mean if i have a datastream of centroids, it would arrive one at
a time on the partitions and that would defeat the purpose because i need
all of the centroid to compare the distance to. 

I tried storing the centroids in an array of centroid but  i again dont
understand how i can push all of the changes back.

a small example or code snippet would really be helpful.

Thanks a lot

Regards
Biplob



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6816.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Mime
View raw message