flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ovidiu-Cristian MARCU <ovidiu-cristian.ma...@inria.fr>
Subject parallelism for window operations
Date Thu, 26 Jan 2017 21:30:00 GMT
Hi,

I have the following program configured with parallelism 2.
After running this example I see only 2 slots are busy.

How can I ensure counts1 and counts2 are executed on their own slots with the given parallelism
(in this case 2 slots each)?

		port = params.getInt("port");

		// get the execution environment
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		env.setParallelism(params.getInt("paral", 2));
		env.setMaxParallelism(params.getInt("paral", 2));

		// get input data by connecting to the socket
		DataStream<String> text = env.socketTextStream("localhost", port, "\n");

		DataStream<Tuple8<String, String, String, Integer, String, Double, Long, Long>>
input = text.flatMap(...);
		DataStream<Double> counts1 = null;

		counts1 = input.keyBy(0).countWindow(windowSize, slideSize)
				.apply(new WindowFunction<Tuple8<String, String, String, Integer, String, Double,
Long, Long>, Double, Tuple, GlobalWindow>() {
...
				});

		DataStream<Double> counts2 = input.keyBy(1).countWindow(windowSize, slideSize)
				.apply(new WindowFunction<Tuple8<String, String, String, Integer, String, Double,
Long, Long>, Double, Tuple, GlobalWindow>() {
...
				});

		counts1.writeAsText(params.get("output1"));
		counts2.writeAsText(params.get("output2"));

		env.execute("Socket Window WordCount");



——

./bin/flink run flink-examples-streaming_2.10-1.2-SNAPSHOT-SocketWindowWordCount.jar --port
9000 --paral 2
Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123
Using address 127.0.0.1:6123 to connect to JobManager.
JobManager web interface address http://127.0.0.1:8081
Starting execution of program
Printing result to stdout. Use --output to specify output path.
Submitting job with JobID: bf063ec3f912871bcc7a95bc041775e5. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@127.0.0.1:6123/user/jobmanager#-126254675]
01/26/2017 22:08:46	Job execution switched to status RUNNING.
01/26/2017 22:08:46	Source: Socket Stream(1/1) switched to SCHEDULED 
01/26/2017 22:08:46	Source: Socket Stream(1/1) switched to DEPLOYING 
01/26/2017 22:08:46	Flat Map(1/2) switched to SCHEDULED 
01/26/2017 22:08:46	Flat Map(1/2) switched to DEPLOYING 
01/26/2017 22:08:46	Flat Map(2/2) switched to SCHEDULED 
01/26/2017 22:08:46	Flat Map(2/2) switched to DEPLOYING 
01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375},
CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e,
WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to SCHEDULED

01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375},
CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e,
WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to DEPLOYING

01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375},
CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e,
WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to SCHEDULED

01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375},
CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e,
WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to DEPLOYING

01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375},
CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f,
WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to SCHEDULED

01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375},
CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f,
WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to DEPLOYING

01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375},
CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f,
WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to SCHEDULED

01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375},
CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f,
WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to DEPLOYING

01/26/2017 22:08:46	Source: Socket Stream(1/1) switched to RUNNING 
01/26/2017 22:08:46	Flat Map(1/2) switched to RUNNING 
01/26/2017 22:08:46	Flat Map(2/2) switched to RUNNING 
01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375},
CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e,
WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to RUNNING

01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375},
CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e,
WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to RUNNING

01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375},
CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f,
WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to RUNNING

01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375},
CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f,
WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to RUNNING


Best,
Ovidiu
Mime
View raw message