flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Biplob Biswas <revolutioni...@gmail.com>
Subject Unexpected behaviour in datastream.broadcast()
Date Thu, 12 May 2016 09:17:11 GMT
Hi,

I am running this following sample code to understand how iteration and
broadcast works in streaming context.

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
		 env.setParallelism(4);
		 long i = 5;
		 DataStream<Long> mainInput = env.generateSequence(2, 8);
		 DataStream<Long> initialIterateInput = env.fromElements(i);
	
	
		IterativeStream.ConnectedIterativeStreams<Long, Long> iteration =
		       
mainInput.iterate().withFeedbackType(BasicTypeInfo.LONG_TYPE_INFO);
		 
	
		DataStream<Long> iterateHead = iteration
		        .flatMap(new CoFlatMapFunction<Long, Long, Long>() {
		            long globalVal = 1;
		        	@Override
		            public void flatMap1(Long value, Collector<Long> out) throws
Exception {
		                Thread.sleep(3000);
		                System.out.println("SEEING FROM INPUT 1: " + value+",
"+globalVal);
		                //globalVal = globalVal + value;
		                out.collect(globalVal+value);
		            }
	
		            @Override
		            public void flatMap2(Long value, Collector<Long> out) throws
Exception {
		                Thread.sleep(1000);
		                globalVal = value;
		                System.out.println("SEEING FROM INPUT 2: " + value+",
"+globalVal);
	
		                //out.collect(value);
	
		            }
		        });
	
		iteration.closeWith(iterateHead.broadcast());
	
		iterateHead.map(new MapFunction<Long, Long>() {
		    @Override
		    public Long map(Long value) throws Exception {
		        System.out.println("SEEING OUTPUT FROM ITERATION: " + value);
		        return value;
		    }
		});

I was expecting that after  out.collect(globalVal+value); is called the
value would be broadcasted to every partition as given by the closewith
statement. Also, i was expecting to get the broadcasted value to the
flatmap2 function and then update the globalval in every partition. 
But  rather than that, the values are not broadcasted and iterated properly
as i was expecting and i am getting the following output,

SEEING FROM INPUT 1: 2, 1
SEEING OUTPUT FROM ITERATION: 3
SEEING FROM INPUT 1: 3, 1
SEEING OUTPUT FROM ITERATION: 4
SEEING FROM INPUT 1: 4, 1
SEEING FROM INPUT 1: 5, 1
SEEING OUTPUT FROM ITERATION: 5
SEEING OUTPUT FROM ITERATION: 6
SEEING FROM INPUT 2: 5, 5
SEEING FROM INPUT 2: 6, 6
SEEING FROM INPUT 1: 6, 1
SEEING OUTPUT FROM ITERATION: 7
SEEING FROM INPUT 1: 7, 1
SEEING OUTPUT FROM ITERATION: 8
SEEING FROM INPUT 1: 8, 1
SEEING OUTPUT FROM ITERATION: 9
SEEING FROM INPUT 2: 4, 4
SEEING FROM INPUT 2: 4, 4
SEEING FROM INPUT 2: 5, 5
SEEING FROM INPUT 2: 5, 5
SEEING FROM INPUT 2: 3, 3
SEEING FROM INPUT 2: 6, 6
SEEING FROM INPUT 2: 6, 6
SEEING FROM INPUT 2: 6, 6
SEEING FROM INPUT 2: 9, 9
SEEING FROM INPUT 2: 3, 3
SEEING FROM INPUT 2: 4, 4
SEEING FROM INPUT 2: 4, 4
SEEING FROM INPUT 2: 8, 8
SEEING FROM INPUT 2: 5, 5
SEEING FROM INPUT 2: 3, 3
SEEING FROM INPUT 2: 3, 3
SEEING FROM INPUT 2: 7, 7
SEEING FROM INPUT 2: 9, 9
SEEING FROM INPUT 2: 9, 9
SEEING FROM INPUT 2: 9, 9
SEEING FROM INPUT 2: 8, 8
SEEING FROM INPUT 2: 8, 8
SEEING FROM INPUT 2: 8, 8
SEEING FROM INPUT 2: 7, 7
SEEING FROM INPUT 2: 7, 7
SEEING FROM INPUT 2: 7, 7


Can anyone please explain why such behaviour? Why is the iteration happening
after reading all the elements of the first input stream? what if it is an
infinite stream, would the iteration wait for it to finish? 

Thanks and Regards



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Unexpected-behaviour-in-datastream-broadcast-tp6848.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Mime
View raw message