flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Fabian Hueske (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (FLINK-7919) Join with Solution Set fails with NPE if Solution Set has no entry
Date Wed, 25 Oct 2017 16:46:00 GMT

     [ https://issues.apache.org/jira/browse/FLINK-7919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Fabian Hueske updated FLINK-7919:
---------------------------------
    Description: 
A job with a delta iteration fails hard with a NPE in the solution set join, if the solution
set has no entry for the join key of the probe side.

The following program reproduces the problem:

{code}
DataSet<Tuple2<Long, Integer>> values = env.fromElements(
  Tuple2.of(1L, 1), Tuple2.of(2L, 1), Tuple2.of(3L, 1));

DeltaIteration<Tuple2<Long, Integer>, Tuple2<Long, Integer>> di = values
  .iterateDelta(values, 5,0);

DataSet<Tuple2<Long, Integer>> loop = di.getWorkset()
  .map(new MapFunction<Tuple2<Long, Integer>, Tuple2<Long, Integer>>() {
    @Override
    public Tuple2<Long, Integer> map(Tuple2<Long, Integer> value) throws Exception
{
      // modifying the key to join on a non existing solution set key 
      return Tuple2.of(value.f0 + 1, 1);
    }
  })
  .join(di.getSolutionSet()).where(0).equalTo(0)
  .with(new JoinFunction<Tuple2<Long, Integer>, Tuple2<Long, Integer>, Tuple2<Long,
Integer>>() {

    @Override
    public Tuple2<Long, Integer> join(
      Tuple2<Long, Integer> first, 
      Tuple2<Long, Integer> second) throws Exception {
      
      return Tuple2.of(first.f0, first.f1 + second.f1);
    }
  });

DataSet<Tuple2<Long, Integer>> result = di.closeWith(loop, loop);
result.print();
{code}

It doesn't matter whether the solution set is managed or not. 

The problem is cause because the solution set hash table prober returns a {{null}} value if
the solution set does not contain a value for the probe side key. 
The join operator does not check if the return value is {{null}} or not but immediately tries
to create a copy using a {{TypeSerializer}}. This copy fails with a NPE.

I propose to check for {{null}} and call the join function with {{null}} on the solution set
side. This gives OUTER JOIN semantics for join.
Since the code was previously failing with a NPE, it is safe to forward the {{null}} into
the {{JoinFunction}}. 

However, users must be aware that the solution set value may be {{null}} and we need to update
the documentation (JavaDocs + website) to describe the behavior.

  was:
A job with a delta iteration fails hard with a NPE in the solution set join, if the solution
set has no entry for the join key of the probe side.

The following program reproduces the problem:

{code}
DataSet<Tuple2<Long, Integer>> values = env.fromElements(
  Tuple2.of(1L, 1), Tuple2.of(2L, 1), Tuple2.of(3L, 1));

DeltaIteration<Tuple2<Long, Integer>, Tuple2<Long, Integer>> di = values
  .iterateDelta(values, 5,0);

DataSet<Tuple2<Long, Integer>> loop = di.getWorkset()
  .map(new MapFunction<Tuple2<Long, Integer>, Tuple2<Long, Integer>>() {
    @Override
    public Tuple2<Long, Integer> map(Tuple2<Long, Integer> value) throws Exception
{
      // modifying the key to join on a non existing solution set key 
      return Tuple2.of(value.f0 + 1, 1);
    }
  })
  .join(di.getSolutionSet()).where(0).equalTo(0)
  .with(new JoinFunction<Tuple2<Long, Integer>, Tuple2<Long, Integer>, Tuple2<Long,
Integer>>() {

    @Override
    public Tuple2<Long, Integer> join(
      Tuple2<Long, Integer> first, 
      Tuple2<Long, Integer> second) throws Exception {
      
      return Tuple2.of(first.f0, first.f1 + second.f1);
    }
  });

DataSet<Tuple2<Long, Integer>> result = di.closeWith(loop, loop);
result.print();
{code}

It doesn't matter whether the solution set is managed or not. 

The problem is cause because the solution set hash table prober returns a {{null}} value if
the solution set does not contain a value for the probe side key. 
The join operator does not check if the return value is {{null}} or not but immediately tries
to create a copy using a {{TypeSerializer}}. This copy fails with a NPE.

There are two solutions:
1. Check for {{null}} and do not call the join function (INNER join semantics)
2. Check for {{null}} and call the join function with {{null}} on the solution set side (OUTER
join semantics)

Either way, the chosen behavior should be documented.


> Join with Solution Set fails with NPE if Solution Set has no entry
> ------------------------------------------------------------------
>
>                 Key: FLINK-7919
>                 URL: https://issues.apache.org/jira/browse/FLINK-7919
>             Project: Flink
>          Issue Type: Bug
>          Components: DataSet API, Local Runtime
>    Affects Versions: 1.4.0, 1.3.2
>            Reporter: Fabian Hueske
>
> A job with a delta iteration fails hard with a NPE in the solution set join, if the solution
set has no entry for the join key of the probe side.
> The following program reproduces the problem:
> {code}
> DataSet<Tuple2<Long, Integer>> values = env.fromElements(
>   Tuple2.of(1L, 1), Tuple2.of(2L, 1), Tuple2.of(3L, 1));
> DeltaIteration<Tuple2<Long, Integer>, Tuple2<Long, Integer>> di = values
>   .iterateDelta(values, 5,0);
> DataSet<Tuple2<Long, Integer>> loop = di.getWorkset()
>   .map(new MapFunction<Tuple2<Long, Integer>, Tuple2<Long, Integer>>()
{
>     @Override
>     public Tuple2<Long, Integer> map(Tuple2<Long, Integer> value) throws
Exception {
>       // modifying the key to join on a non existing solution set key 
>       return Tuple2.of(value.f0 + 1, 1);
>     }
>   })
>   .join(di.getSolutionSet()).where(0).equalTo(0)
>   .with(new JoinFunction<Tuple2<Long, Integer>, Tuple2<Long, Integer>,
Tuple2<Long, Integer>>() {
>     @Override
>     public Tuple2<Long, Integer> join(
>       Tuple2<Long, Integer> first, 
>       Tuple2<Long, Integer> second) throws Exception {
>       
>       return Tuple2.of(first.f0, first.f1 + second.f1);
>     }
>   });
> DataSet<Tuple2<Long, Integer>> result = di.closeWith(loop, loop);
> result.print();
> {code}
> It doesn't matter whether the solution set is managed or not. 
> The problem is cause because the solution set hash table prober returns a {{null}} value
if the solution set does not contain a value for the probe side key. 
> The join operator does not check if the return value is {{null}} or not but immediately
tries to create a copy using a {{TypeSerializer}}. This copy fails with a NPE.
> I propose to check for {{null}} and call the join function with {{null}} on the solution
set side. This gives OUTER JOIN semantics for join.
> Since the code was previously failing with a NPE, it is safe to forward the {{null}}
into the {{JoinFunction}}. 
> However, users must be aware that the solution set value may be {{null}} and we need
to update the documentation (JavaDocs + website) to describe the behavior.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message