flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mustafa Elbehery <elbeherymust...@gmail.com>
Subject Re: coGroup Iterator NoSuchElement
Date Thu, 04 Jun 2015 09:18:36 GMT
Yes, Its working now .. But my assumption is that I want to join different
datasets on the common key, so it will be normal to have many tuples on
side, which does not exist on the other side ..

How to fix that ?!!

On Thu, Jun 4, 2015 at 11:00 AM, Fabian Hueske <fhueske@gmail.com> wrote:

> Hi,
>
> one of the iterables of a CoGroup function can be empty. Calling
> iterator.next() on an empty iterator raises the NoSuchElementException.
> This is the expected behavior of the function.
>
> Are you sure your assumption about your data are correct, i.e., that the
> iterator should always have (at least) one element?
>
> Fabian
>
>
> 2015-06-04 10:47 GMT+02:00 Mustafa Elbehery <elbeherymustafa@gmail.com>:
>
>> Hi,
>>
>>
>> public static class ComputeStudiesProfile implements CoGroupFunction<Person, StudentInfo,
Person> {
>>
>>    Person person;
>>
>>    @Override
>>    public void coGroup(Iterable<Person> iterable, Iterable<StudentInfo>
iterable1, Collector<Person> collector) throws Exception {
>>
>>       Iterator<Person> iterator = iterable.iterator();
>>       person = iterator.next();
>>
>>       ArrayList<StudentInfo> infos = new ArrayList<StudentInfo>();
>>       Iterator<StudentInfo> infosIterator = iterable1.iterator();
>>
>>       while(infosIterator.hasNext())
>>             infos.add(infosIterator.next());
>>
>>       if (infos.size() > 0) {
>>          update(person, infos, collector);
>>       }
>>    }
>>
>>    public void update(Person person, Collection<StudentInfo> infos, Collector<Person>
collector) {
>>       person.setMajor(infos.iterator().next().getMajor());
>>       for(StudentInfo info : infos){
>>          person.getBestCourse().addAll(info.getCourses());
>>       }
>>       collector.collect(person);
>>    }
>> }
>>
>>  *******************************************************************************************************
>>
>>
>> public static class ComputeJobsProfile implements CoGroupFunction<Person, StudentJobs,
Person> {
>>
>>    @Override
>>    public void coGroup(Iterable<Person> iterable, Iterable<StudentJobs>
iterable1, Collector<Person> collector) throws Exception {
>>
>>        Person person = iterable.iterator().next();
>>
>>       ArrayList<StudentJobs> jobs = new ArrayList<StudentJobs>();
>>       for (StudentJobs job : iterable1) {
>>          jobs.add(job);
>>       }
>>       if (jobs.size() > 0) {
>>          update(person, jobs, collector);
>>       }
>>    }
>>
>>    public void update(Person person, Collection<StudentJobs> jobs, Collector<Person>
collector) {
>>
>>       for(StudentJobs job : jobs){
>>          person.getJobs().addAll(job.getJobs());
>>       }
>>       collector.collect(person);
>>    }
>> }
>>
>>
>> On Wed, Jun 3, 2015 at 11:49 PM, Stephan Ewen <sewen@apache.org> wrote:
>>
>>> Hi!
>>>
>>> The code snippet is not very revealing. Can you also shot the
>>> implementations of the CoGroupFunctions?
>>>
>>> Thanks!
>>>
>>> On Wed, Jun 3, 2015 at 3:50 PM, Mustafa Elbehery <
>>> elbeherymustafa@gmail.com> wrote:
>>>
>>>> Code Snippet :)
>>>>
>>>> DataSet<Person> updatedPersonOne = inPerson.coGroup(inStudent)
>>>>                            .where("name").equalTo("name")
>>>>                            .with(new ComputeStudiesProfile());
>>>>
>>>> DataSet<Person> updatedPersonTwo = updatedPersonOne.coGroup(inJobs)
>>>>                            .where("name").equalTo("name")
>>>>                            .with(new ComputeJobsProfile());
>>>>
>>>> updatedPersonTwo.print();
>>>>
>>>>
>>>> On Wed, Jun 3, 2015 at 3:45 PM, Mustafa Elbehery <
>>>> elbeherymustafa@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I am trying to write two coGrouprs in sequence on the same ETL .. In
>>>>> use common dataset in both of them, in the first coGroup I update the
>>>>> initial dataset and retrieve the result in a new dataset object. Then
I use
>>>>> the result in the second coGroup with another new dataset.
>>>>>
>>>>> While debugging, I could see the coGroup.next is *false *, however,
>>>>> in the next iteration it has elements. I tried to force enabling
>>>>> ObjectReuse, I got *half* of the expected result. I have attached a
>>>>> screenshot for the debugger.
>>>>>
>>>>> My question is, does this has a relation about the concurrent
>>>>> execution of different tasks in Flink. And how to solve this problem
??
>>>>>
>>>>> Regards.
>>>>>
>>>>>
>>>>> --
>>>>> Mustafa Elbehery
>>>>> EIT ICT Labs Master School
>>>>> <http://www.masterschool.eitictlabs.eu/home/>
>>>>> +49(0)15750363097
>>>>> skype: mustafaelbehery87
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Mustafa Elbehery
>>>> EIT ICT Labs Master School
>>>> <http://www.masterschool.eitictlabs.eu/home/>
>>>> +49(0)15750363097
>>>> skype: mustafaelbehery87
>>>>
>>>>
>>>
>>
>>
>> --
>> Mustafa Elbehery
>> EIT ICT Labs Master School <http://www.masterschool.eitictlabs.eu/home/>
>> +49(0)15750363097
>> skype: mustafaelbehery87
>>
>>
>


-- 
Mustafa Elbehery
EIT ICT Labs Master School <http://www.masterschool.eitictlabs.eu/home/>
+49(0)15750363097
skype: mustafaelbehery87

Mime
View raw message