flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Join two Datasets --> InvalidProgramException
Date Tue, 09 Feb 2016 20:03:49 GMT
Hi,
glad you could resolve the POJO issue, but the new error doesn't look
right.
The CO_GROUP_RAW strategy should only be used for programs that are
implemented against the Python DataSet API.
I guess that's not the case since all code snippets were Java so far.

Can you post the full stacktrace of the exception?

2016-02-09 20:13 GMT+01:00 Dominique Rondé <dominique.ronde@codecentric.de>:

> Hi all,
>
> i finally figured out that there is a getter for a boolean field which may
> be the source of the trouble. It seems that getBooleanField (as we use it)
> is not the best choice. Now the plan is executed with another error code. :(
>
> Caused by: java.lang.Exception: Unsupported driver strategy for join
> driver: CO_GROUP_RAW
>
> Is there any link to a documentation or some example code which you  may
> recommend beside the offical documentation?
>
> But folks, thanks for your greate support! A really nice community here!
>
> Greets
> Dominique
>
>
> Am 09.02.2016 um 19:41 schrieb Till Rohrmann:
>
> I tested the TypeExtractor with your SourceA and SourceB types (adding
> proper setters and getters) and it correctly returned a PojoType. Thus, I
> would suspect that you haven’t specified the proper setters and getters in
> your implementation.
>
> Cheers,
> Till
> ​
>
> On Tue, Feb 9, 2016 at 2:46 PM, Dominique Rondé <
> dominique.ronde@codecentric.de> wrote:
>
>> Here we go!
>>
>>   ExecutionEnvironment env =
>> ExecutionEnvironment.createRemoteEnvironment("xxx.xxx.xxx.xxx",
>> 53408,"flink-job.jar");
>>
>>
>>   DataSource<String> datasourceA=
>> env.readTextFile("hdfs://dev//sourceA/");
>>   DataSource<String> datasourceB=
>> env.readTextFile("hdfs://dev//sourceB/");
>>
>>   DataSet<SourceA> sourceA= datasourceA.map(new SourceAMapper());
>>   DataSet<SourceB> sourceB= datasourceB.map(new SourceBMapper());
>>
>>   sourceA.join(sourceB).where("sessionId").equalTo("sessionId").print();
>>
>> Thanks a lot!
>> Dominique
>>
>>
>> Am 09.02.2016 um 14:36 schrieb Till Rohrmann:
>>
>> Could you post the complete example code (Flink job including the type
>> definitions). For example, if the data sets are of type DataSet<Parent>,
>> then it will be treated as a GenericType. Judging from your pseudo code,
>> it looks fine on the first glance.
>>
>> Cheers,
>> Till
>> ​
>>
>> On Tue, Feb 9, 2016 at 2:25 PM, Dominique Rondé <
>> <dominique.ronde@codecentric.de>dominique.ronde@codecentric.de> wrote:
>>
>>> Sorry, i was out for lunch. Maybe the problem is that sessionID is a
>>> String?
>>>
>>> public abstract class Parent{
>>>   private Date eventDate;
>>>   private EventType eventType;
>>>   private String sessionId;
>>>
>>> public Parent() { }
>>> //GETTER & SETTER
>>> }
>>>
>>> public class SourceA extends Parent{
>>>   private Boolean outboundMessage;
>>>   private String soapMessage;
>>>
>>> public SourceA () {
>>>     super();
>>>  }
>>> //GETTER & SETTER
>>> }
>>>
>>> public class SourceB extends Parent{
>>>   private Integer id;
>>>   private String username;
>>>
>>> public SourceB () {
>>>     super();
>>>  }
>>> //GETTER & SETTER
>>>
>>> }
>>>
>>> Am 09.02.2016 um 12:06 schrieb Till Rohrmann:
>>>
>>> Could you share the code for your types SourceA and SourceB. It seems
>>> as if Flink does not recognize them to be POJOs because he assigned them
>>> the GenericType type. Either there is something wrong with the type
>>> extractor or your implementation does not fulfil the requirements for
>>> POJOs, as indicated by Chiwan.
>>>
>>> Cheers,
>>> Till
>>> ​
>>>
>>> On Tue, Feb 9, 2016 at 11:53 AM, Dominique Rondé <
>>> <dominique.ronde@codecentric.de>dominique.ronde@codecentric.de> wrote:
>>>
>>>> The fields in SourceA and SourceB are private but have public getters
>>>> and setters. The classes provide an empty and public constructor.
>>>> Am 09.02.2016 11:47 schrieb "Chiwan Park" < <chiwanpark@apache.org>
>>>> chiwanpark@apache.org>:
>>>>
>>>>> Oh, the fields in SourceA have public getters. Does the fields in
>>>>> SourceA have public setter? SourceA needs public setter for private fields.
>>>>>
>>>>> Regards,
>>>>> Chiwan Park
>>>>>
>>>>> > On Feb 9, 2016, at 7:45 PM, Chiwan Park < <chiwanpark@apache.org>
>>>>> chiwanpark@apache.org> wrote:
>>>>> >
>>>>> > Hi Dominique,
>>>>> >
>>>>> > It seems that `SourceA` is not dealt as POJO. Are all fields in
>>>>> SourceA public? There are some requirements for POJO classes [1].
>>>>> >
>>>>> > [1]:
>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#pojos>
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#pojos
>>>>> >
>>>>> > Regards,
>>>>> > Chiwan Park
>>>>> >
>>>>> >> On Feb 9, 2016, at 7:42 PM, Dominique Rondé <
>>>>> <dominique.ronde@codecentric.de>dominique.ronde@codecentric.de>
wrote:
>>>>> >>
>>>>> >> Hi  folks,
>>>>> >>
>>>>> >> i try to join two datasets containing some PoJos. Each PoJo
inherit
>>>>> a field "sessionId" from the parent class. The field is private but has
a
>>>>> public getter.
>>>>> >>
>>>>> >> The join is like this:
>>>>> >> DataSet<Tuple2<SourceA,SourceB>> joinedDataSet =
>>>>> sourceA.join(SourceB).where("sessionId").equalTo("sessionId");
>>>>> >>
>>>>> >> But the result is the following execption:
>>>>> >>
>>>>> >> Exception in thread "main"
>>>>> org.apache.flink.api.common.InvalidProgramException: This type
>>>>> (GenericType<x.y.z.service.eventstore.dto.SourceA>) cannot be used
as key.
>>>>> >>    at
>>>>> org.apache.flink.api.java.operators.Keys$ExpressionKeys.<init>(Keys.java:287)
>>>>> >>    at
>>>>> org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.where(JoinOperator.java:890)
>>>>> >>    at
>>>>> x.y.z.eventstore.processing.pmc.PmcProcessor.main(PmcProcessor.java:55)
>>>>> >>
>>>>> >> I spend some time with google around but I don't get an idea
what
>>>>> is wrong. I hope some of you can give me a hint...
>>>>> >>
>>>>> >> Greets
>>>>> >> Dominique
>>>>> >>
>>>>> >
>>>>>
>>>>>
>>>
>>> --
>>> Dominique Rondé | Senior Consultant
>>>
>>> codecentric AG | Kreuznacherstrasse 30 | 60486 Frankfurt | Deutschland
>>> mobil: +49 (0) 172.7182592www.codecentric.de | blog.codecentric.de | www.meettheexperts.de
| www.more4fi.de
>>>
>>> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
>>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz
>>>
>>>
>>
>> --
>> Dominique Rondé | Senior Consultant
>>
>> codecentric AG | Kreuznacherstrasse 30 | 60486 Frankfurt | Deutschland
>> mobil: +49 (0) 172.7182592www.codecentric.de | blog.codecentric.de | www.meettheexperts.de
| www.more4fi.de
>>
>> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz
>>
>>
>
> --
> Dominique Rondé | Senior Consultant
>
> codecentric AG | Kreuznacherstrasse 30 | 60486 Frankfurt | Deutschland
> mobil: +49 (0) 172.7182592www.codecentric.de | blog.codecentric.de | www.meettheexperts.de
| www.more4fi.de
>
> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz
>
>

Mime
View raw message