flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: coGroup Iterator NoSuchElement
Date Thu, 04 Jun 2015 09:22:04 GMT
I am not sure if I got your question right.

You can easily prevent the NoSuchElementException, but calling next() only
if hasNext() returns true.

2015-06-04 11:18 GMT+02:00 Mustafa Elbehery <elbeherymustafa@gmail.com>:

> Yes, Its working now .. But my assumption is that I want to join different
> datasets on the common key, so it will be normal to have many tuples on
> side, which does not exist on the other side ..
>
> How to fix that ?!!
>
> On Thu, Jun 4, 2015 at 11:00 AM, Fabian Hueske <fhueske@gmail.com> wrote:
>
>> Hi,
>>
>> one of the iterables of a CoGroup function can be empty. Calling
>> iterator.next() on an empty iterator raises the NoSuchElementException.
>> This is the expected behavior of the function.
>>
>> Are you sure your assumption about your data are correct, i.e., that the
>> iterator should always have (at least) one element?
>>
>> Fabian
>>
>>
>> 2015-06-04 10:47 GMT+02:00 Mustafa Elbehery <elbeherymustafa@gmail.com>:
>>
>>> Hi,
>>>
>>>
>>> public static class ComputeStudiesProfile implements CoGroupFunction<Person,
StudentInfo, Person> {
>>>
>>>    Person person;
>>>
>>>    @Override
>>>    public void coGroup(Iterable<Person> iterable, Iterable<StudentInfo>
iterable1, Collector<Person> collector) throws Exception {
>>>
>>>       Iterator<Person> iterator = iterable.iterator();
>>>       person = iterator.next();
>>>
>>>       ArrayList<StudentInfo> infos = new ArrayList<StudentInfo>();
>>>       Iterator<StudentInfo> infosIterator = iterable1.iterator();
>>>
>>>       while(infosIterator.hasNext())
>>>             infos.add(infosIterator.next());
>>>
>>>       if (infos.size() > 0) {
>>>          update(person, infos, collector);
>>>       }
>>>    }
>>>
>>>    public void update(Person person, Collection<StudentInfo> infos, Collector<Person>
collector) {
>>>       person.setMajor(infos.iterator().next().getMajor());
>>>       for(StudentInfo info : infos){
>>>          person.getBestCourse().addAll(info.getCourses());
>>>       }
>>>       collector.collect(person);
>>>    }
>>> }
>>>
>>>  *******************************************************************************************************
>>>
>>>
>>> public static class ComputeJobsProfile implements CoGroupFunction<Person,
StudentJobs, Person> {
>>>
>>>    @Override
>>>    public void coGroup(Iterable<Person> iterable, Iterable<StudentJobs>
iterable1, Collector<Person> collector) throws Exception {
>>>
>>>        Person person = iterable.iterator().next();
>>>
>>>       ArrayList<StudentJobs> jobs = new ArrayList<StudentJobs>();
>>>       for (StudentJobs job : iterable1) {
>>>          jobs.add(job);
>>>       }
>>>       if (jobs.size() > 0) {
>>>          update(person, jobs, collector);
>>>       }
>>>    }
>>>
>>>    public void update(Person person, Collection<StudentJobs> jobs, Collector<Person>
collector) {
>>>
>>>       for(StudentJobs job : jobs){
>>>          person.getJobs().addAll(job.getJobs());
>>>       }
>>>       collector.collect(person);
>>>    }
>>> }
>>>
>>>
>>> On Wed, Jun 3, 2015 at 11:49 PM, Stephan Ewen <sewen@apache.org> wrote:
>>>
>>>> Hi!
>>>>
>>>> The code snippet is not very revealing. Can you also shot the
>>>> implementations of the CoGroupFunctions?
>>>>
>>>> Thanks!
>>>>
>>>> On Wed, Jun 3, 2015 at 3:50 PM, Mustafa Elbehery <
>>>> elbeherymustafa@gmail.com> wrote:
>>>>
>>>>> Code Snippet :)
>>>>>
>>>>> DataSet<Person> updatedPersonOne = inPerson.coGroup(inStudent)
>>>>>                            .where("name").equalTo("name")
>>>>>                            .with(new ComputeStudiesProfile());
>>>>>
>>>>> DataSet<Person> updatedPersonTwo = updatedPersonOne.coGroup(inJobs)
>>>>>                            .where("name").equalTo("name")
>>>>>                            .with(new ComputeJobsProfile());
>>>>>
>>>>> updatedPersonTwo.print();
>>>>>
>>>>>
>>>>> On Wed, Jun 3, 2015 at 3:45 PM, Mustafa Elbehery <
>>>>> elbeherymustafa@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I am trying to write two coGrouprs in sequence on the same ETL ..
In
>>>>>> use common dataset in both of them, in the first coGroup I update
the
>>>>>> initial dataset and retrieve the result in a new dataset object.
Then I use
>>>>>> the result in the second coGroup with another new dataset.
>>>>>>
>>>>>> While debugging, I could see the coGroup.next is *false *, however,
>>>>>> in the next iteration it has elements. I tried to force enabling
>>>>>> ObjectReuse, I got *half* of the expected result. I have attached
a
>>>>>> screenshot for the debugger.
>>>>>>
>>>>>> My question is, does this has a relation about the concurrent
>>>>>> execution of different tasks in Flink. And how to solve this problem
??
>>>>>>
>>>>>> Regards.
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Mustafa Elbehery
>>>>>> EIT ICT Labs Master School
>>>>>> <http://www.masterschool.eitictlabs.eu/home/>
>>>>>> +49(0)15750363097
>>>>>> skype: mustafaelbehery87
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Mustafa Elbehery
>>>>> EIT ICT Labs Master School
>>>>> <http://www.masterschool.eitictlabs.eu/home/>
>>>>> +49(0)15750363097
>>>>> skype: mustafaelbehery87
>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> Mustafa Elbehery
>>> EIT ICT Labs Master School <http://www.masterschool.eitictlabs.eu/home/>
>>> +49(0)15750363097
>>> skype: mustafaelbehery87
>>>
>>>
>>
>
>
> --
> Mustafa Elbehery
> EIT ICT Labs Master School <http://www.masterschool.eitictlabs.eu/home/>
> +49(0)15750363097
> skype: mustafaelbehery87
>
>

Mime
View raw message