flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Radu Tudoran <radu.tudo...@huawei.com>
Subject RE: Question about DataStream serialization
Date Tue, 08 Dec 2015 15:31:57 GMT
Hi,

The state that is being loaded can very well be partitioned by keys. Assuming this scenario
and that you would now that the keys go from 0 to N, is there some possibility to load and
partitioned the initial data in the open function?


Dr. Radu Tudoran
Research Engineer
IT R&D Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudoran@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, which is intended
only for the person or entity whose address is listed above. Any use of the information contained
herein in any way (including, but not limited to, total or partial disclosure, reproduction,
or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive
this e-mail in error, please notify the sender by phone or email immediately and delete it!


-----Original Message-----
From: Aljoscha Krettek [mailto:aljoscha@apache.org] 
Sent: Tuesday, December 08, 2015 4:20 PM
To: user@flink.apache.org
Subject: Re: Question about DataStream serialization

Ah, I see what’s the problem. Operator state is scoped to the key of the incoming element.
In the open() method, no element has been received yet, so the key of the incoming element
is basically NULL. So the open() method initializes state for key NULL. In flatMap() we actually
have a key of incoming elements so we access state for a specific key, which has default value
“0” (from the getKeyValueState() call).

OperatorState is only useful if the state needs to be partitioned by key, but here it seems
that the state is valid for all elements?
> On 08 Dec 2015, at 15:30, Radu Tudoran <radu.tudoran@huawei.com> wrote:
> 
> final StreamExecutionEnvironment env = StreamExecutionEnvironment
> 				.getExecutionEnvironment();
> 
> 		DataStream<String> stream = env
> 				.socketTextStream("localhost", 16333, '\n')
> 				.map(new MapFunction<String, Tuple1<String>>() {
> 					@Override
> 					public Tuple1<String> map(String arg0) throws Exception {
> 						return new Tuple1<String>(arg0);
> 					}
> 				}).keyBy(0)
> 				.flatMap(new RichFlatMapFunction<Tuple1<String>, String>() {
> 
> 					private OperatorState<Integer> dataset;
> 
> 					@Override
> 					public void flatMap(Tuple1<String> arg0,
> 							Collector<String> arg1) throws Exception {
> 
> 						if (dataset.value() > 0)
> 							arg1.collect("Test OK " + arg0);
> 
> 						
> 						
> 					}
> 
> 					@Override
> 					public void open(Configuration parameters) throws Exception {
> 
> 						dataset = getRuntimeContext().getKeyValueState(
> 								"loadeddata", Integer.class, 0);
> 
> 						
> 						 /*
> 						  * Simulate loading data
> 						  * Looks like if this part is  commented out and the dataset is 
> 						  * initialize with 1 for example, than the non-zero value is available 
> 						  * in the flatMap function  
> 						  */
> 						  
> 						  for(int i=0;i<10;i++) {
> 						  	  dataset.update(dataset.value()+1);
> 						  }
> 						  
> 						  //System.out.println("dataset value "+dataset.value());
> 						  
> 					}
> 				});
> 
> 		stream.print();
> 
> 		env.execute("test open function");

Mime
View raw message