flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Stephan Ewen (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators
Date Thu, 26 May 2016 14:16:12 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15302127#comment-15302127

Stephan Ewen commented on FLINK-3974:

Yes, that is a pretty clear bug.

I guess the best workaround for now is to disable the object reuse mode. Object reuse does
not really work well in the DataStream API streaming right now, it works pretty well in the
DataSet API.

Another quick workaround is to not chain the two different map functions {{.disableChaining()}}.

The solution should be quite straightforward, though:
  - Not chain and "splitting" flows any more. I would actually like that solution. For splitting
flows, it seems like a good heuristic to start a new chain/thread by default.
  - Each collector should use its own dedicated stream record. That would circumvent the ClassCast
at least, but still be dangerous if the mappers actually alter the events.

> enableObjectReuse fails when an operator chains to multiple downstream operators
> --------------------------------------------------------------------------------
>                 Key: FLINK-3974
>                 URL: https://issues.apache.org/jira/browse/FLINK-3974
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.0.3
>            Reporter: B Wyatt
>         Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch
> Given a topology that looks like this:
> {code:java}
> DataStream<A> input = ...
> input
>     .map(MapFunction<A,B>...)
>     .addSink(...);
> input
>     .map(MapFunction<A,C>...)
>     ‚Äč.addSink(...);
> {code}
> enableObjectReuse() will cause an exception in the form of {{"java.lang.ClassCastException:
B cannot be cast to A"}} to be thrown.
> It looks like the input operator calls {{Output<StreamRecord<A>>.collect}}
which attempts to loop over the downstream operators and process them.
> However, the first map operation will call {{StreamRecord<>.replace}} which mutates
the value stored in the StreamRecord<>.  
> As a result, when the {{Output<StreamRecord<A>>.collect}} call passes the
{{StreamRecord<A>}} to the second map operation it is actually a {{StreamRecord<B>}}
and behaves as if the two map operations were serial instead of parallel.

This message was sent by Atlassian JIRA

View raw message