flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chesnay Schepler <ches...@apache.org>
Subject Re: Flink and factories?
Date Wed, 19 Oct 2016 16:58:07 GMT
Hello,

admittedly i didn't look to deeply into this, but I would assume that 
you are only modifying the factory on the client. When the operators are 
deserialized on a cluster, your singleton instance is back to the 
default, which is apples (i think), since the statement that changes the 
factory is never being executed there.

Basically, don't use static singletons. Instead, store the factory 
returned by Config.getInstance().getFactory() in a field in your function.

Regards,
Chesnay

On 19.10.2016 18:07, Sebastian Neef wrote:
> Hi,
>
> I'm currently working with flink for my bachelor thesis and I'm running
> into some odd issues with flink in regards to factories.
>
> I've built a small "proof of concept" and the code can be found here:
> https://gitlab.tubit.tu-berlin.de/gehaxelt/flink-factorytest
>
> The idea is that a Config-singleton holds information or objects to use,
> e.g. an AppleFactory (default) which implements a specific IDataFactory
> interface. This AppleFactory is then used in a flatMap to create Apples
> (objects which implement the IData interface):
>
>>          System.out.println("Factory before processedData: " + Config.getInstance().getFactory().getClass());
>>          DataSet<IData> processedData = this.getEnv().fromCollection(inputData).flatMap(new
FlatMapFunction<Integer, IData>() {
>>              @Override
>>              public void flatMap(Integer integer, Collector<IData> collector)
throws Exception {
>>                  if (integer % 2 == 0) {
>>                      collector.collect(Config.getInstance().getFactory().newInstance());
>>                  }
>>              }
>>          });
>>         System.out.println("Factory after processedData: " + Config.getInstance().getFactory().getClass());
>>          try {
>>              System.out.println("Class created: " + processedData.collect().get(0).getClass());
>>              this.getDataHolder().setDataList(processedData.collect());
>>          } catch (Exception e) {
>>              e.printStackTrace();
>>          }
>
> This happens in the "Config -> initData()" function. My Flink-Job looks
> like this:
>
>> 	public static void main(String[] args) throws Exception {
>>
>> 		Config c = Config.getInstance(); //Use AppleFactory by default
>>
>>          //BOOM: Somehow flink ignores this?
>>          c.setFactory(new PearFactory());
>>
>>          c.initData();
>>
>>          DataSet<IData> data = c.getEnv().fromCollection(c.getDataHolder().getDataList());
> As you can see before the "c.initData()" call I set the factory to a
> "PearFactory()" which will produce Pear-objects (also implementing the
> IData interface).
>
> Running the code will print the following text:
>
>> Class created: class factorytest.Data.Apple
> This, however, means that flink didn't catch (or ignored?) that the
> factory has changed and still creates objects of type Apple.
>
> Instead I'd expect the processedData.collect() list to contain
> Pear-objects. What is even more confusing is that the two "Factory
> before/after processedData" print statements correctly return the
> PearFactory class.
>
> What's the best way to fix this? Any tips/tricks/questions?
>
> I guess that this issue is might be hard to explain in words, so I'd
> really appreciate it if someone could have a look at the code and maybe
> do an example run:
>
> Job.java:
> https://gitlab.tubit.tu-berlin.de/gehaxelt/flink-factorytest/blob/master/src/main/java/factorytest/Job.java
>
> Config.java:
> https://gitlab.tubit.tu-berlin.de/gehaxelt/flink-factorytest/blob/master/src/main/java/factorytest/Config.java
>
> Example run:
> https://gitlab.tubit.tu-berlin.de/gehaxelt/flink-factorytest/blob/master/EXAMPLE_RUN_OUTPUT.txt
>
> Kind regards,
> Sebastian Neef
>


Mime
View raw message