flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Giacomo Licari <giacomo.lic...@gmail.com>
Subject Re: Inheritance and FlatMap with custom POJO
Date Wed, 16 Sep 2015 15:20:20 GMT
Yes I did.

if anyone has a bypass solution, let us know.

Regards,
Giacomo Licari

On Wed, Sep 16, 2015 at 5:15 PM, Chiwan Park <chiwanpark@apache.org> wrote:

> Hi Giacomo,
>
> Did you create constructors without arguments in both base class and
> derived class?
> If you do, it seems like a bug.
>
> Regards,
> Chiwan Park
>
> > On Sep 17, 2015, at 12:04 AM, Giacomo Licari <giacomo.licari@gmail.com>
> wrote:
> >
> > Hi Chiwan,
> > I followed instructions in documentation.
> > I have a simple base class with some properties (all public).
> > Then I extend that class with a new public property (tweet in my case),
> I provide also getter and setter for that property.
> >
> > Now when I execute:
> > DataSet<TwitterPOJO> ds_twitter = env.readCsvFile("file://"+path_twitter)
> >                    .pojoType(TwitterPOJO.class, "table", "time",
> "tweet");
> >
> > I receive:
> > There is no field called "table" in com.Flink.POJO.TwitterPOJO
> >
> > table is a field of the Base class, declared as public with also getter
> and setter.
> >
> > Thank you for your help.
> >
> > Giacomo
> >
> > On Wed, Sep 16, 2015 at 4:42 PM, Chiwan Park <chiwanpark@apache.org>
> wrote:
> > Hi Giacomo,
> >
> > You should set your field as public. If you are set your field as
> private or protected, the class must provide getter and setter to be
> treated as POJO.
> >
> > Maybe the documentation in homepage [1] would be helpful.
> >
> > Regards,
> > Chiwan Park
> >
> > [1]
> https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#pojos
> >
> > > On Sep 16, 2015, at 11:25 PM, Giacomo Licari <giacomo.licari@gmail.com>
> wrote:
> > >
> > > I run it only implementing java.io.Serializable without disabling the
> closure cleaner.
> > >
> > > Another question I have is about POJO classes.
> > > I would also create a base POJO class with some common proprerties,
> and then extend it in new classes. These classes are used to convert a CSV
> into a dataset of POJO objects (of derived class type).
> > >
> > > In the following example, I create a DataSet of TwitterPOJO, which
> extends a Base class, adding the new property "tweet".
> > >
> > > DataSet<TwitterPOJO> ds_twitter =
> env.readCsvFile("file://"+path_twitter)
> > >                    .pojoType(TwitterPOJO.class, "table", "time",
> "tweet");
> > >
> > > I obtain this error:
> > > [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class
> com.Flink.POJO.TwitterPOJO is not a valid POJO type
> > > Exception in thread "main" java.lang.ClassCastException:
> org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to
> org.apache.flink.api.java.typeutils.PojoTypeInfo
> > >
> > > Greetings,
> > > G.L.
> > >
> > > On Wed, Sep 16, 2015 at 4:05 PM, Stephan Ewen <sewen@apache.org>
> wrote:
> > > Could you also try the other variant (disabeling the closure cleaner)?
> I would be curious if this behavior is expected Java Serialization
> behavior, or whether our pre-processing code is causing it.
> > >
> > > Greetings,
> > > Stephan
> > >
> > >
> > > On Wed, Sep 16, 2015 at 3:38 PM, Giacomo Licari <
> giacomo.licari@gmail.com> wrote:
> > > Thank you Martin and Stephan for your help.
> > > I tried directly to implement java.io.Serializable in Base class and
> it worked perfectly!
> > >
> > > Now I can develop more flexible and maintainable code. Thank you a lot
> guys.
> > >
> > > Greetings,
> > > Giacomo
> > >
> > > On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen <sewen@apache.org>
> wrote:
> > > Hi!
> > >
> > > Interesting case. We use plain Java Serialization to distribute UDFs,
> and perform additional "cleaning" of scopes, which may be causing the issue.
> > >
> > > Can you try the following to see if any of those resolves the problem?
> > >
> > > 1) On the environment, disable the closure cleaner (in the execution
> config).
> > >
> > > 2) Let the CullTimeBase class implement java.io.Serializable.
> > >
> > > Please let us know how it turns out!
> > >
> > > Greetings,
> > > Stephan
> > >
> > >
> > > On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns <
> m.junghanns@mailbox.org> wrote:
> > > Hi Giacomo,
> > >
> > > I ran into the same issue. Seems to be coupled to the serialization
> mechanism of UDFs. I solved it by letting the base class implement the UDF
> interface (e.g. FlatMapFunction) and in addition make it generic (which
> should be necessary in your example).
> > >
> > > public [abstract] class CullTimeBase<IN, OUT> implements
> FlatMapFunction<IN, OUT> {
> > > // ...
> > > }
> > >
> > > public class CullTimeRainFall extends CullTimeBase<RainFallPOJO,
> RainFallPOJO> {
> > > // ...
> > > }
> > >
> > > This should work.
> > >
> > > Best,
> > > Martin
> > >
> > >
> > > On 16.09.2015 10:41, Giacomo Licari wrote:
> > >> Hi guys,
> > >> I'm trying to create a base class which is inherited from classes
> implementing FlatMap method on specific POJO types.
> > >>
> > >> It seems inheritance doesn't work, I can access this.PropertyName or
> super.PropertyName from flatMap method but values are always null.
> > >>
> > >> Here the derived class, using RainfallPOJO:
> > >>
> > >> public class CullTimeRainfall extends CullTimeBase implements
> FlatMapFunction<RainfallPOJO, RainfallPOJO> {
> > >>
> > >>      public CullTimeRainfall(int num, int den, String time_data_name,
> String start_time, String end_time, int interval, String time_unit){
> > >>              super(num, den, time_data_name, start_time, end_time,
> interval, time_unit);
> > >>      }
> > >>
> > >>      public void flatMap(RainfallPOJO obj, Collector<RainfallPOJO>
> coll) throws Exception {
> > >>              DateFormat formatter = new
> SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss.SSS");
> > >>              try {
> > >>                  Date time = formatter.parse(
> obj.getTime().replaceAll( "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})",
> "$1$2" ) );
> > >>              if(time.after(this.startTime) &&
> time.before(this.endTime)){
> > >>                              coll.collect(obj);
> > >>                      }
> > >>              } catch(Exception e){
> > >>                      e.printStackTrace();
> > >>              }
> > >>      }
> > >>
> > >> }
> > >>
> > >> My Base class is:
> > >>
> > >> public class CullTimeBase {
> > >>
> > >>     protected int numerator;
> > >>     protected int denominator;
> > >>     protected String timeDataName;
> > >>     protected Date startTime;
> > >>     protected Date endTime;
> > >>     protected int interval;
> > >>     protected String timeUnit;
> > >>
> > >>      public CullTimeBase(int num, int den, String time_data_name,
> String start_time, String end_time, int interv, String time_unit){
> > >>              numerator = num;
> > >>              denominator = den;
> > >>              timeDataName = time_data_name;
> > >>              interval = interv;
> > >>              timeUnit = time_unit;
> > >>              DateFormat formatter = new
> SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
> > >>              try {
> > >>                      startTime = formatter.parse(start_time);
> > >>                      endTime = formatter.parse(end_time);
> > >>              } catch (ParseException e) {
> > >>                      e.printStackTrace();
> > >>              }
> > >>      }
> > >>
> > >> It works only if I declare all variables and methods in only one
> class, but so I should repeat the same properties in more classes. I would
> only specialize each derived class with a custom flatMap method. which uses
> a custom POJO type.
> > >>
> > >> Thanks a lot,
> > >> Giacomo
> > >
> > >
> > >
> > >
> > >
> >
> >
> >
> >
>
>
>
>
>

Mime
View raw message