flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ovidiu-Cristian MARCU <ovidiu-cristian.ma...@inria.fr>
Subject Re: parallelism for window operations
Date Fri, 27 Jan 2017 09:43:28 GMT
Thank you, Fabian!

It works, what I did and results, as an example for other users:
Total slots occupied are 7 (not sure how to check that Source + Flat Map are in the same slot,
assumed slot S1 will be that; also S6 and S7 are different, although I set the same name for
slot sharing group).

		// get input data by connecting to the socket
		DataStream<String> text = env.socketTextStream("localhost", port, "\n");

		
		DataStream<IN> input = text.flatMap(...).slotSharingGroup("PInput").setParallelism(1);
//ONE SLOT S1
		DataStream<Double> counts1 = null;

		counts1 = input.keyBy(0).countWindow(windowSize, slideSize)
				.apply(new WindowFunction<IN, Double, Tuple, GlobalWindow>() {
...
				}).slotSharingGroup("firstWindow").setParallelism(1).setMaxParallelism(1); //ONE SLOT
S2

		DataStream<Double> counts2 = input.keyBy(2).countWindow(windowSize, slideSize)
				.apply(new WindowFunction<Tuple8<String, String, String, Integer, String, Double,
Long, Long>, Double, Tuple, GlobalWindow>() {
...
				}).slotSharingGroup("secondWindow").setParallelism(3).setMaxParallelism(3); //THREE SLOTS
S3, S4, S5

		counts1.writeAsText(params.get("output1")).slotSharingGroup("output").setParallelism(1);
//ONE SLOT S6
		counts2.writeAsText(params.get("output2")).slotSharingGroup("output").setParallelism(1);
//ONE SLOT S7

		env.execute("Socket Window WordCount");


Best,
Ovidiu

> On 27 Jan 2017, at 10:13, Fabian Hueske <fhueske@gmail.com> wrote:
> 
> Hi Ovidiu,
> 
> you can control the slot assignment by assigning operators to SlotSharingGroups.
> For example like this:
> 
> someStream.filter(...).slotSharingGroup("name");
> 
> Operators is different groups are scheduled to different slots. By default, all operators
are in the same group.
> Have a look at the docs as well [1]
> 
> Best, Fabian
> 
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/index.html#task-chaining-and-resource-groups
<https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/index.html#task-chaining-and-resource-groups>
> 
> 2017-01-26 22:30 GMT+01:00 Ovidiu-Cristian MARCU <ovidiu-cristian.marcu@inria.fr <mailto:ovidiu-cristian.marcu@inria.fr>>:
> Hi,
> 
> I have the following program configured with parallelism 2.
> After running this example I see only 2 slots are busy.
> 
> How can I ensure counts1 and counts2 are executed on their own slots with the given parallelism
(in this case 2 slots each)?
> 
> 		port = params.getInt("port");
> 
> 		// get the execution environment
> 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> 
> 		env.setParallelism(params.getInt("paral", 2));
> 		env.setMaxParallelism(params.getInt("paral", 2));
> 
> 		// get input data by connecting to the socket
> 		DataStream<String> text = env.socketTextStream("localhost", port, "\n");
> 
> 		DataStream<Tuple8<String, String, String, Integer, String, Double, Long, Long>>
input = text.flatMap(...);
> 		DataStream<Double> counts1 = null;
> 
> 		counts1 = input.keyBy(0).countWindow(windowSize, slideSize)
> 				.apply(new WindowFunction<Tuple8<String, String, String, Integer, String, Double,
Long, Long>, Double, Tuple, GlobalWindow>() {
> ...
> 				});
> 
> 		DataStream<Double> counts2 = input.keyBy(1).countWindow(windowSize, slideSize)
> 				.apply(new WindowFunction<Tuple8<String, String, String, Integer, String, Double,
Long, Long>, Double, Tuple, GlobalWindow>() {
> ...
> 				});
> 
> 		counts1.writeAsText(params.get("output1"));
> 		counts2.writeAsText(params.get("output2"));
> 
> 		env.execute("Socket Window WordCount");
> <Screen Shot 2017-01-26 at 22.21.13.png>
> 
> 
> ——
> 
> ./bin/flink run flink-examples-streaming_2.10-1.2-SNAPSHOT-SocketWindowWordCount.jar
--port 9000 --paral 2
> Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123 <http://127.0.0.1:6123/>
> Using address 127.0.0.1:6123 <http://127.0.0.1:6123/> to connect to JobManager.
> JobManager web interface address http://127.0.0.1:8081 <http://127.0.0.1:8081/>
> Starting execution of program
> Printing result to stdout. Use --output to specify output path.
> Submitting job with JobID: bf063ec3f912871bcc7a95bc041775e5. Waiting for job completion.
> Connected to JobManager at Actor[akka.tcp://flink@127.0.0.1:6123/user/jobmanager#-126254675
<>]
> 01/26/2017 22:08:46	Job execution switched to status RUNNING.
> 01/26/2017 22:08:46	Source: Socket Stream(1/1) switched to SCHEDULED 
> 01/26/2017 22:08:46	Source: Socket Stream(1/1) switched to DEPLOYING 
> 01/26/2017 22:08:46	Flat Map(1/2) switched to SCHEDULED 
> 01/26/2017 22:08:46	Flat Map(1/2) switched to DEPLOYING 
> 01/26/2017 22:08:46	Flat Map(2/2) switched to SCHEDULED 
> 01/26/2017 22:08:46	Flat Map(2/2) switched to DEPLOYING 
> 01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375},
CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e,
WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to SCHEDULED

> 01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375},
CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e,
WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to DEPLOYING

> 01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375},
CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e,
WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to SCHEDULED

> 01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375},
CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e,
WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to DEPLOYING

> 01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375},
CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f,
WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to SCHEDULED

> 01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375},
CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f,
WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to DEPLOYING

> 01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375},
CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f,
WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to SCHEDULED

> 01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375},
CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f,
WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to DEPLOYING

> 01/26/2017 22:08:46	Source: Socket Stream(1/1) switched to RUNNING 
> 01/26/2017 22:08:46	Flat Map(1/2) switched to RUNNING 
> 01/26/2017 22:08:46	Flat Map(2/2) switched to RUNNING 
> 01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375},
CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e,
WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to RUNNING

> 01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375},
CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e,
WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to RUNNING

> 01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375},
CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f,
WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to RUNNING

> 01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375},
CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f,
WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to RUNNING

> 
> Best,
> Ovidiu
> 


Mime
View raw message