flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dominique Rondé <dominique.ro...@codecentric.de>
Subject Re: Join two Datasets --> InvalidProgramException
Date Wed, 10 Feb 2016 06:33:08 GMT
Hi,

your guess is correct. I use java all the time... Here is the complete 
stacktrace:

Exception in thread "main" 
org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Job execution failed.
     at org.apache.flink.client.program.Client.runBlocking(Client.java:367)
     at org.apache.flink.client.program.Client.runBlocking(Client.java:345)
     at org.apache.flink.client.program.Client.runBlocking(Client.java:312)
     at 
org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:212)
     at 
org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:189)
     at 
org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:160)
     at 
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:803)
     at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
     at org.apache.flink.api.java.DataSet.print(DataSet.java:1583)
     at 
x.y.z.eventstore.processing.pmc.PmcProcessor.main(PmcProcessor.java:103)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
execution failed.
     at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:563)
     at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
     at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
     at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
     at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
     at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
     at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
     at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
     at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
     at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
     at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: The data preparation for task 'CHAIN 
Join(Join at main(PmcProcessor.java:103)) -> FlatMap (collect())' , 
caused an error: Unsupported driver strategy for join driver: CO_GROUP_RAW
     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
     at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
     at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Unsupported driver strategy for join 
driver: CO_GROUP_RAW
     at 
org.apache.flink.runtime.operators.JoinDriver.prepare(JoinDriver.java:193)
     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
     ... 3 more

Am 09.02.2016 um 21:03 schrieb Fabian Hueske:
> Hi,
> glad you could resolve the POJO issue, but the new error doesn't look 
> right.
> The CO_GROUP_RAW strategy should only be used for programs that are 
> implemented against the Python DataSet API.
> I guess that's not the case since all code snippets were Java so far.
>
> Can you post the full stacktrace of the exception?
>
> 2016-02-09 20:13 GMT+01:00 Dominique Rondé 
> <dominique.ronde@codecentric.de <mailto:dominique.ronde@codecentric.de>>:
>
>     Hi all,
>
>     i finally figured out that there is a getter for a boolean field
>     which may be the source of the trouble. It seems that
>     getBooleanField (as we use it) is not the best choice. Now the
>     plan is executed with another error code. :(
>
>     Caused by: java.lang.Exception: Unsupported driver strategy for
>     join driver: CO_GROUP_RAW
>
>     Is there any link to a documentation or some example code which
>     you  may recommend beside the offical documentation?
>
>     But folks, thanks for your greate support! A really nice community
>     here!
>
>     Greets
>     Dominique
>
>
>     Am 09.02.2016 um 19:41 schrieb Till Rohrmann:
>>
>>     I tested the |TypeExtractor| with your |SourceA| and |SourceB|
>>     types (adding proper setters and getters) and it correctly
>>     returned a |PojoType|. Thus, I would suspect that you haven’t
>>     specified the proper setters and getters in your implementation.
>>
>>     Cheers,
>>     Till
>>
>>     ​
>>
>>     On Tue, Feb 9, 2016 at 2:46 PM, Dominique Rondé
>>     <dominique.ronde@codecentric.de
>>     <mailto:dominique.ronde@codecentric.de>> wrote:
>>
>>         Here we go!
>>
>>           ExecutionEnvironment env =
>>         ExecutionEnvironment.createRemoteEnvironment("xxx.xxx.xxx.xxx",
>>         53408,"flink-job.jar");
>>
>>
>>           DataSource<String> datasourceA=
>>         env.readTextFile("hdfs://dev//sourceA/");
>>           DataSource<String> datasourceB=
>>         env.readTextFile("hdfs://dev//sourceB/");
>>
>>           DataSet<SourceA> sourceA= datasourceA.map(new SourceAMapper());
>>           DataSet<SourceB> sourceB= datasourceB.map(new SourceBMapper());
>>
>>         sourceA.join(sourceB).where("sessionId").equalTo("sessionId").print();
>>
>>         Thanks a lot!
>>         Dominique
>>
>>
>>         Am 09.02.2016 um 14:36 schrieb Till Rohrmann:
>>>
>>>         Could you post the complete example code (Flink job
>>>         including the type definitions). For example, if the data
>>>         sets are of type |DataSet<Parent>|, then it will be treated
>>>         as a |GenericType|. Judging from your pseudo code, it looks
>>>         fine on the first glance.
>>>
>>>         Cheers,
>>>         Till
>>>
>>>         ​
>>>
>>>         On Tue, Feb 9, 2016 at 2:25 PM, Dominique Rondé
>>>         <dominique.ronde@codecentric.de
>>>         <mailto:dominique.ronde@codecentric.de>> wrote:
>>>
>>>             Sorry, i was out for lunch. Maybe the problem is that
>>>             sessionID is a String?
>>>
>>>             public abstract class Parent{
>>>               private Date eventDate;
>>>               private EventType eventType;
>>>               private String sessionId;
>>>
>>>             public Parent() { }
>>>             //GETTER & SETTER
>>>             }
>>>
>>>             public class SourceA extends Parent{
>>>               private Boolean outboundMessage;
>>>               private String soapMessage;
>>>
>>>             public SourceA () {
>>>                 super();
>>>              }
>>>             //GETTER & SETTER
>>>             }
>>>
>>>             public class SourceB extends Parent{
>>>               private Integer id;
>>>               private String username;
>>>
>>>             public SourceB () {
>>>                 super();
>>>              }
>>>             //GETTER & SETTER
>>>
>>>             }
>>>
>>>             Am 09.02.2016 um 12:06 schrieb Till Rohrmann:
>>>>
>>>>             Could you share the code for your types |SourceA| and
>>>>             |SourceB|. It seems as if Flink does not recognize them
>>>>             to be POJOs because he assigned them the |GenericType|
>>>>             type. Either there is something wrong with the type
>>>>             extractor or your implementation does not fulfil the
>>>>             requirements for POJOs, as indicated by Chiwan.
>>>>
>>>>             Cheers,
>>>>             Till
>>>>
>>>>             ​
>>>>
>>>>             On Tue, Feb 9, 2016 at 11:53 AM, Dominique Rondé
>>>>             <dominique.ronde@codecentric.de
>>>>             <mailto:dominique.ronde@codecentric.de>> wrote:
>>>>
>>>>                 The fields in SourceA and SourceB are private but
>>>>                 have public getters and setters. The classes
>>>>                 provide an empty and public constructor.
>>>>
>>>>                 Am 09.02.2016 11:47 schrieb "Chiwan Park"
>>>>                 <chiwanpark@apache.org <mailto:chiwanpark@apache.org>>:
>>>>
>>>>                     Oh, the fields in SourceA have public getters.
>>>>                     Does the fields in SourceA have public setter?
>>>>                     SourceA needs public setter for private fields.
>>>>
>>>>                     Regards,
>>>>                     Chiwan Park
>>>>
>>>>                     > On Feb 9, 2016, at 7:45 PM, Chiwan Park
>>>>                     <chiwanpark@apache.org
>>>>                     <mailto:chiwanpark@apache.org>> wrote:
>>>>                     >
>>>>                     > Hi Dominique,
>>>>                     >
>>>>                     > It seems that `SourceA` is not dealt as POJO.
>>>>                     Are all fields in SourceA public? There are
>>>>                     some requirements for POJO classes [1].
>>>>                     >
>>>>                     > [1]:
>>>>                     https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#pojos
>>>>                     >
>>>>                     > Regards,
>>>>                     > Chiwan Park
>>>>                     >
>>>>                     >> On Feb 9, 2016, at 7:42 PM, Dominique Rondé
>>>>                     <dominique.ronde@codecentric.de
>>>>                     <mailto:dominique.ronde@codecentric.de>> wrote:
>>>>                     >>
>>>>                     >> Hi folks,
>>>>                     >>
>>>>                     >> i try to join two datasets containing some
>>>>                     PoJos. Each PoJo inherit a field "sessionId"
>>>>                     from the parent class. The field is private but
>>>>                     has a public getter.
>>>>                     >>
>>>>                     >> The join is like this:
>>>>                     >> DataSet<Tuple2<SourceA,SourceB>>
>>>>                     joinedDataSet =
>>>>                     sourceA.join(SourceB).where("sessionId").equalTo("sessionId");
>>>>                     >>
>>>>                     >> But the result is the following execption:
>>>>                     >>
>>>>                     >> Exception in thread "main"
>>>>                     org.apache.flink.api.common.InvalidProgramException:
>>>>                     This type
>>>>                     (GenericType<x.y.z.service.eventstore.dto.SourceA>)
>>>>                     cannot be used as key.
>>>>                     >>    at
>>>>                     org.apache.flink.api.java.operators.Keys$ExpressionKeys.<init>(Keys.java:287)
>>>>                     >>    at
>>>>                     org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.where(JoinOperator.java:890)
>>>>                     >>    at
>>>>                     x.y.z.eventstore.processing.pmc.PmcProcessor.main(PmcProcessor.java:55)
>>>>                     >>
>>>>                     >> I spend some time with google around but I
>>>>                     don't get an idea what is wrong. I hope some of
>>>>                     you can give me a hint...
>>>>                     >>
>>>>                     >> Greets
>>>>                     >> Dominique
>>>>                     >>
>>>>                     >
>>>>
>>>>
>>>
>>>             -- 
>>>             Dominique Rondé | Senior Consultant
>>>
>>>             codecentric AG | Kreuznacherstrasse 30 | 60486 Frankfurt | Deutschland
>>>             mobil:+49 (0) 172.7182592 <tel:%2B49%20%280%29%20172.7182592>
>>>             www.codecentric.de <http://www.codecentric.de>  |blog.codecentric.de
<http://blog.codecentric.de>  |www.meettheexperts.de <http://www.meettheexperts.de>
 |www.more4fi.de <http://www.more4fi.de>     
>>>
>>>             Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
>>>             Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>>>             Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen
Schütz
>>>
>>>
>>
>>         -- 
>>         Dominique Rondé | Senior Consultant
>>
>>         codecentric AG | Kreuznacherstrasse 30 | 60486 Frankfurt | Deutschland
>>         mobil:+49 (0) 172.7182592 <tel:%2B49%20%280%29%20172.7182592>
>>         www.codecentric.de <http://www.codecentric.de>  |blog.codecentric.de
<http://blog.codecentric.de>  |www.meettheexperts.de <http://www.meettheexperts.de>
 |www.more4fi.de <http://www.more4fi.de>     
>>
>>         Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
>>         Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>>         Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz
>>
>>
>
>     -- 
>     Dominique Rondé | Senior Consultant
>
>     codecentric AG | Kreuznacherstrasse 30 | 60486 Frankfurt | Deutschland
>     mobil:+49 (0) 172.7182592 <tel:%2B49%20%280%29%20172.7182592>
>     www.codecentric.de <http://www.codecentric.de>  |blog.codecentric.de <http://blog.codecentric.de>
 |www.meettheexperts.de <http://www.meettheexperts.de>  |www.more4fi.de <http://www.more4fi.de>
    
>
>     Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
>     Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>     Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz
>
>

-- 
Dominique Rondé | Senior Consultant

codecentric AG | Kreuznacherstrasse 30 | 60486 Frankfurt | Deutschland
mobil: +49 (0) 172.7182592
www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | www.more4fi.de

Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz


Mime
View raw message