Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E3934200B27 for ; Wed, 22 Jun 2016 17:38:59 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E1F44160A36; Wed, 22 Jun 2016 15:38:59 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 25E0F160A24 for ; Wed, 22 Jun 2016 17:38:58 +0200 (CEST) Received: (qmail 72173 invoked by uid 500); 22 Jun 2016 15:38:58 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 72159 invoked by uid 99); 22 Jun 2016 15:38:58 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 22 Jun 2016 15:38:58 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id E9DE72C14E1 for ; Wed, 22 Jun 2016 15:38:57 +0000 (UTC) Date: Wed, 22 Jun 2016 15:38:57 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Wed, 22 Jun 2016 15:39:00 -0000 [ https://issues.apache.org/jira/browse/FLINK-3974?page=3Dcom.atlassian= .jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=3D1534= 4537#comment-15344537 ]=20 ASF GitHub Bot commented on FLINK-3974: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2110#discussion_r68076977 =20 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming= /runtime/tasks/OperatorChain.java --- @@ -306,8 +306,9 @@ public ChainingOutput(OneInputStreamOperator = operator) { =09=09@Override =09=09public void collect(StreamRecord record) { =09=09=09try { -=09=09=09=09operator.setKeyContextElement1(record); -=09=09=09=09operator.processElement(record); +=09=09=09=09StreamRecord shallowCopy =3D record.copy(record.getValu= e()); +=09=09=09=09operator.setKeyContextElement1(shallowCopy); +=09=09=09=09operator.processElement(shallowCopy); --- End diff -- =20 Actually I'm wondering whether the `ChainingOutput` is the right place = to do this copying. Wouldn't it make more sense to do it in the `Broadcasti= ngOutputCollector`, because only if we have a branching chained data flow w= e have to make sure that every down stream operator get his own copy of the= record. For simple chaining it should be correct to reuse the stream recor= d. =20 So I would adapt the `collect` method of `BroadcastingOutputCollector` = the following way: =20 ``` public void collect(StreamRecord record) { =09for (int i =3D 0; i < outputs.length - 1; i++) { =09=09StreamRecord shallowCopy =3D record.copy(record.getValue()); =09=09outputs[i].collect(shallowCopy); =09} =20 =09outputs[outputs.length - 1].collect(record); } ``` > enableObjectReuse fails when an operator chains to multiple downstream op= erators > -------------------------------------------------------------------------= ------- > > Key: FLINK-3974 > URL: https://issues.apache.org/jira/browse/FLINK-3974 > Project: Flink > Issue Type: Bug > Components: DataStream API > Affects Versions: 1.0.3 > Reporter: B Wyatt > Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch > > > Given a topology that looks like this: > {code:java} > DataStream input =3D ... > input > .map(MapFunction...) > .addSink(...); > input > .map(MapFunction...) > =E2=80=8B.addSink(...); > {code} > enableObjectReuse() will cause an exception in the form of {{"java.lang.C= lassCastException: B cannot be cast to A"}} to be thrown. > It looks like the input operator calls {{Output>.collect}= } which attempts to loop over the downstream operators and process them. > However, the first map operation will call {{StreamRecord<>.replace}} whi= ch mutates the value stored in the StreamRecord<>. =20 > As a result, when the {{Output>.collect}} call passes the= {{StreamRecord}} to the second map operation it is actually a {{StreamR= ecord}} and behaves as if the two map operations were serial instead of = parallel. -- This message was sent by Atlassian JIRA (v6.3.4#6332)