flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Biplob Biswas <revolutioni...@gmail.com>
Subject Re: Regarding Broadcast of datasets in streaming context
Date Sun, 15 May 2016 17:17:50 GMT
Hi,

i read that article already but it is very simplistic and thus based on that
article and other examples, i was trying to understand how my centroids can
be sent to all the partitions and update accordingly.

I also understood that the order of the input and the feedback stream cant
be determined but i was expecting it to be broadcasted after every collect
call so that all the partitions receive updated values.

But now i am confused how this entire iteration and broadcast thing can even
help me maintaining a central state of centroids.

I have even tried something similar to this: 

 DataStream<Long> mainInput = env.generateSequence(2, 30);
		 DataStream<Long> initialIterateInput = env.fromElements(i);
	
	
		IterativeStream.ConnectedIterativeStreams<Long, Long[]> iteration =
		        mainInput.iterate().withFeedbackType(Long[].class);
		 
	
		DataStream<Long[]> iterateHead = iteration
		        .flatMap(new CoFlatMapFunction<Long, Long[], Long[]>() {
		            long globalVal = 1;
		            Long[] arr;
		            boolean flag = true;
		            int i = 0;
		        	@Override
		            public void flatMap1(Long value, Collector<Long[]> out) throws
Exception {
		        		
		        		if(flag)
		        		{
		        			arr = new Long[10];
		        		}
		                Thread.sleep(1000);
		                arr[i] = value;
		                i++;
		                System.out.println("SEEING FROM INPUT 1: " +
Arrays.toString(arr) +", "+globalVal);		                
		                out.collect(arr);
		            }
	
		            @Override
		            public void flatMap2(Long[] value, Collector<Long[]> out)
throws Exception {
		                Thread.sleep(1000);
		                for(int i=0 ;i<value.length;i++)
		                {
		                	arr[i] = value[i];
		                }
		                System.out.println("SEEING FROM INPUT 2: " +
Arrays.toString(arr) +", "+globalVal);
	
		                //out.collect(value);
	
		            }
		        });
	
		iteration.closeWith(iterateHead.broadcast());

where the arr is the array of my centroids and the value in the first map
would be the points coming from input stream. 
So,i made this example to be run for a small streaming scenario and the
results which are being printed. 

As I started working on this based on the idea that collection is done and
then on each iteration for each point the broadcast supplies the latest
centroid.

That's why i am constantly asking you and providing you updates of what I
did and what I am doing, but unless I understand how this central state of
centroid is  emulated I cant proceed forward.

Thus I request you if you can provide me with a small example or snippet or
anything to make me understand how are you proposing to keep a central state
and when to update. As without this basic understanding I am not being able
to do anything.

Thanks a lot.

Regards
Biplob



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6932.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Mime
View raw message