Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D9C0C200C64 for ; Fri, 28 Apr 2017 18:34:41 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id D84E7160BA3; Fri, 28 Apr 2017 16:34:41 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 036C6160B95 for ; Fri, 28 Apr 2017 18:34:40 +0200 (CEST) Received: (qmail 84027 invoked by uid 500); 28 Apr 2017 16:34:40 -0000 Mailing-List: contact user-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@beam.apache.org Delivered-To: mailing list user@beam.apache.org Received: (qmail 84018 invoked by uid 99); 28 Apr 2017 16:34:40 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 28 Apr 2017 16:34:40 +0000 Received: from mail-io0-f171.google.com (mail-io0-f171.google.com [209.85.223.171]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id E0BD71A0280 for ; Fri, 28 Apr 2017 16:34:39 +0000 (UTC) Received: by mail-io0-f171.google.com with SMTP id r16so69440442ioi.2 for ; Fri, 28 Apr 2017 09:34:39 -0700 (PDT) X-Gm-Message-State: AN3rC/7QfiFS0sxYcq6p9i9fqB3ZnPjQBAGHBXev5sLEOVbLxEdy9oDO dGf1AxfeIUMLYxqXR/ZhGnIhtkXRepxo X-Received: by 10.107.22.194 with SMTP id 185mr13374779iow.122.1493397279249; Fri, 28 Apr 2017 09:34:39 -0700 (PDT) MIME-Version: 1.0 Received: by 10.107.142.14 with HTTP; Fri, 28 Apr 2017 09:34:18 -0700 (PDT) In-Reply-To: <018F1EB7-350A-46D9-B8BB-104D41419E01@apache.org> References: <2DA0A850-54AE-4FC4-BFE6-14CBA2FFA3CE@apache.org> <018F1EB7-350A-46D9-B8BB-104D41419E01@apache.org> From: Frances Perry Date: Fri, 28 Apr 2017 09:34:18 -0700 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: AvroCoder + KafkaIO + Flink problem To: user@beam.apache.org Content-Type: multipart/alternative; boundary=94eb2c05a1b6571c19054e3ca6bd archived-at: Fri, 28 Apr 2017 16:34:42 -0000 --94eb2c05a1b6571c19054e3ca6bd Content-Type: text/plain; charset=UTF-8 I have the same problem and am working around it with SerializableCoder. +1 to a real solution. On Fri, Apr 28, 2017 at 8:46 AM, Aljoscha Krettek wrote: > I think you could. But we should also try finding a solution for this > problem. > > On 28. Apr 2017, at 17:31, Borisa Zivkovic > wrote: > > Hi Aljoscha, > > this is probably the same problem I am facing. > > I execute multiple pipelines on the same Flink cluster - all launched at > the same time... > > I guess I can try to switch to SerializableCoder and see how that works? > > thanks > > > > On Fri, 28 Apr 2017 at 16:20 Aljoscha Krettek wrote: > >> Hi, >> There is this open issue: https://issues.apache.org/jira/browse/BEAM-1970. >> Could this also be what is affecting you? Are you running several pipelines >> on the same Flink cluster, either one after another or at the same time? >> >> Best, >> Aljoscha >> >> On 28. Apr 2017, at 12:45, Borisa Zivkovic >> wrote: >> >> Hi, >> >> I have this small pipeline that is writing data to Kafka (using >> AvroCoder) and then another job is reading the same data from Kafka, doing >> few transformations and then writing data back to different Kafka topic >> (AvroCoder again). >> >> First pipeline is very simple, read data from a text file, create POJO, >> use AvroCoder to write POJO to Kafka. >> >> Second pipeline is also simple, read POJO from Kafka, do few >> transformations, create new POJO and write data to Kafka using AvroCoder >> again. >> >> When I use direct runner everything is ok. >> >> When I switch to flink runner (small remote flink cluster) I get this >> exception in the second pipeline >> >> Caused by: java.lang.ClassCastException: test.MyClass cannot be cast to >> test.MyClass >> >> This happens in the the first MapFunction immediately after reading data >> from Kafka. >> >> I found about this problem in Flink and how they resolve it but not sure >> how to fix this when using Beam?! >> >> https://issues.apache.org/jira/browse/FLINK-1390 >> >> test.MyClass has annotation @DefaultCoder(AvroCoder.class) and is very >> simple POJO. >> >> Not sure how to fix this and still continue using AvroCoder. >> >> My beam version is 0.6.0 - my flink version is 1.2.0 >> >> Anyone experienced something similar or has idea how to fix/workaround >> this? >> >> thanks >> >> >> > --94eb2c05a1b6571c19054e3ca6bd Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
I have the same problem and am working around it with Seri= alizableCoder. +1 to a real solution.

On Fri, Apr 28, 2017 at 8:46 AM, Aljoscha Krettek <aljoscha@apache.org> wrote:
I think you could. But= we should also try finding a solution for this problem.

On 2= 8. Apr 2017, at 17:31, Borisa Zivkovic <borisha.zivkovic@gmail.com= > wrote:

Hi Aljoscha,

this is probably the same problem I am facing.

=
I execute multiple pipelines on the same Flink cluster - all launched = at the same time...=C2=A0

I guess I can try to swi= tch to SerializableCoder and see how that works?

t= hanks



On Fri, 28 Apr 2017 at 16:20 Aljoscha Krettek <aljosc= ha@apache.org> wrote:
Hi,
There is this open issue:=C2=A0https://issues.apache.org/jira/browse/BEAM-1970. Cou= ld this also be what is affecting you? Are you running several pipelines on= the same Flink cluster, either one after another or at the same time?

Best,
Aljoscha
On 28. Apr 2017,= at 12:45, Borisa Zivkovic <borisha.zivkovic@gmail.com> wro= te:

Hi,

I have this small pipeline that is writing data to Kafka (using Av= roCoder) and then another job is reading the same data from Kafka, doing fe= w transformations and then writing data back to different Kafka topic (Avro= Coder again).

First pipeline is very simple, read = data from a text file, create POJO, use AvroCoder to write POJO to Kafka.

Second pipeline is also simple, read POJO from Kafk= a, do few transformations, create new POJO and write data to Kafka using Av= roCoder again.

When I use direct runner everything= is ok.

When I switch to flink runner (small remot= e flink cluster) I get this exception in the second pipeline

=
Caused by: java.lang.ClassCastException: test.MyClass cannot be = cast to test.MyClass

This happens in the the f= irst MapFunction immediately after reading data from Kafka.

<= /div>
I found about this problem in Flink and how they resolve it but n= ot sure how to fix this when using Beam?!


test.MyClass has annotation=C2=A0@DefaultCoder(Av= roCoder.class) and is very simple POJO.

Not s= ure how to fix this and still continue using AvroCoder.

My beam version is 0.6.0 - my flink version is 1.2.0

Anyone experienced something similar or has idea how to fix/workar= ound this?

thanks



--94eb2c05a1b6571c19054e3ca6bd--