flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chesnay Schepler <ches...@apache.org>
Subject Re: Reading Parameter values sent to partition
Date Sat, 28 May 2016 14:27:24 GMT
There are 2 flaws in your code:

Let's start with the fundamental one:

At no point do you associate your mapConf with the flatMap or even the job.

THeoretically you should add it to the flatMap using 
flatMap(...).withConfiguration(mapConf) method.

But here's is the second a more subtle flaw:

the withConfiguration() method does not work at all in the streaming API.

The solution to your problem is to pass your parameter k into the
constructor of MyFlatMap and store it in a field.

Regards,
Chesnay

On 28.05.2016 14:11, Biplob Biswas wrote:
> Hi,
>
> I am trying to send some static integer values down to each map function,
> using the following code
>
> 	public static void main(String[] args) throws Exception {
> 		
> 		ParameterTool params = ParameterTool.fromArgs(args);
> 		
> 		String filePath = params.getRequired("path");
> 		int k = params.getInt("k");
> 		
> 		Configuration mapConf = new Configuration();
> 		mapConf.setInteger("numberofMC", k);
>
> 		
> 		StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> 		
> 		DataStream<Point> tuples = env.addSource(new DataStreamGenerator(filePath,
> streamSpeed));
>                  tuples.flatMap(new MyFlatmap())
>           }
>
> -------------------------------------------------------------------------------------------------------
> public static final class MyFlatmap extends RichFlatMapFunction<Point,
> 																		Tuple2&lt;MicroCluster[],Integer>>{
> 		int numofMC = 5;
> 		public MyCoFlatmap() {
> 		}
> 		
> 		public void open(Configuration parameters) throws Exception {
> 		      super.open(parameters);
>
> 		      numofMC = parameters.getInteger("numberofMC",-1);
> 				System.out.println(numofMC);
>
> 		    }
> 		
> 		@Override
> 		public void flatMap(Point in, Collector<Tuple2&lt;MicroCluster[],
> Integer>> out) throws Exception {
>                 }
>
>
> but when i do the above things to get the value of numberofMC, i dont get it
> to the map funcitons and it returns me the default value of -1.
>
> What could be the reason behind this?
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Reading-Parameter-values-sent-to-partition-tp7228.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
>


Mime
View raw message