Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 6738E200B56 for ; Sat, 30 Jul 2016 12:04:40 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 65CAB160A81; Sat, 30 Jul 2016 10:04:40 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5E443160A8A for ; Sat, 30 Jul 2016 12:04:39 +0200 (CEST) Received: (qmail 593 invoked by uid 500); 30 Jul 2016 10:04:38 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 583 invoked by uid 99); 30 Jul 2016 10:04:38 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 30 Jul 2016 10:04:38 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id CF869C0D72 for ; Sat, 30 Jul 2016 10:04:37 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.567 X-Spam-Level: X-Spam-Status: No, score=0.567 tagged_above=-999 required=6.31 tests=[HTML_MESSAGE=2, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_MED=-2.3, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426, URI_HEX=1.313] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id 6uEPgZOEAa66 for ; Sat, 30 Jul 2016 10:04:35 +0000 (UTC) Received: from mail.tu-berlin.de (mail.tu-berlin.de [130.149.7.33]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 1032A5F24E for ; Sat, 30 Jul 2016 10:04:35 +0000 (UTC) X-tubIT-Incoming-IP: 130.149.6.150 Received: from ex-mbx06.tubit.win.tu-berlin.de ([130.149.6.150] helo=exchange.tu-berlin.de) by mail.tu-berlin.de (exim-4.84_2/mailfrontend-6) with esmtp for id 1bTR86-0003ol-5i; Sat, 30 Jul 2016 12:04:28 +0200 Received: from EX-MBX-01.tubit.win.tu-berlin.de (130.149.6.151) by EX-MBX06.tubit.win.tu-berlin.de (130.149.6.150) with Microsoft SMTP Server (TLS) id 15.0.1178.4; Sat, 30 Jul 2016 12:04:25 +0200 Received: from EX-MBX-01.tubit.win.tu-berlin.de ([130.149.6.151]) by EX-MBX-01.tubit.win.tu-berlin.de ([130.149.6.151]) with mapi id 15.00.1178.000; Sat, 30 Jul 2016 12:04:25 +0200 From: "Paschek, Robert" To: "user@flink.apache.org" Subject: Serialization of "not a valid POJO type" Thread-Topic: Serialization of "not a valid POJO type" Thread-Index: AdHqQ6+yxz4iTYdxSy2j+wM1W6FF0A== Date: Sat, 30 Jul 2016 10:04:25 +0000 Message-ID: <3ee1ba9e5f1040bd88519469ab48522c@EX-MBX-01.tubit.win.tu-berlin.de> Accept-Language: de-DE, en-US Content-Language: de-DE X-MS-Has-Attach: X-MS-TNEF-Correlator: x-ms-exchange-transport-fromentityheader: Hosted x-originating-ip: [172.26.25.186] x-pmwin-version: 4.0.1, Antivirus-Engine: 3.64.3, Antivirus-Data: 5.29 x-puremessage: [Scanned] Content-Type: multipart/alternative; boundary="_000_3ee1ba9e5f1040bd88519469ab48522cEXMBX01tubitwintuberlin_" MIME-Version: 1.0 archived-at: Sat, 30 Jul 2016 10:04:40 -0000 --_000_3ee1ba9e5f1040bd88519469ab48522cEXMBX01tubitwintuberlin_ Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: quoted-printable Hi Mailing List, according to my questions (and your answers!) at this topic http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Perform= ance-issues-with-GroupBy-td8130.html I have eliminated my ArrayList in my collect methods. Additional I want = to emit partial results. My mapper has the following layout: ArrayList structure =3D ... For (Tuple tuple : input) { addTupleToStructure() } While(WorkNotDone) { doSomeWorkOnStructure() emitPartialResult(); } Instead of emitting the partial result as an ArrayList ("not a valid POJ= O type") I do now iterate through this ArrayList and emit each Tuple as = Tuple2.of(Integer.valueOf(this.partitionIndex), tuple))); While iterating through this ArrayList and emitting tuples, my mapper seems= to be blocked and can't continue to doSomeWorkOnStructure(). So I have three questions: - If I change back to emitting the ArrayList would my Mapper al= so be blocked until Flink has serialized this ArrayList? Or is Serializa= tion done independent from my Mapper? - If emitting the ArrayList won't block my Mapper, which varian= t would be more performant? - If I emit ArrayList, but additionally implement a combiner, w= hich o Merges all local ArrayLists with the same partitionIndex o Iterates through the local-merged ArrayLists and emits the containin= g tuples would that be the best variant? Because the combining is done locally, I wo= uld assume that no Serialization is required between Mapper and Combiner. A= lso, the Mapper is probably not blocked with emitting tuples and can contin= ue doSomeWorkOnStructure() Thank you in advance! Robert --_000_3ee1ba9e5f1040bd88519469ab48522cEXMBX01tubitwintuberlin_ Content-Type: text/html; charset="us-ascii" Content-Transfer-Encoding: quoted-printable

Hi Mailing List,

 

according to my questions (and = your answers!) at this topic

http://apache-flink-user-mailing-list-archive.2336050.n4.= nabble.com/Performance-issues-with-GroupBy-td8130.html

 

I have eliminated my ArrayList&= lt;T> in my collect methods. Additional I want to emit partial results. = My mapper has the following layout:

 

ArrayList<ArrayList<Tuple= >  structure =3D …

 

For (Tuple tuple : input) {

     &= nbsp;          addTupleToStruc= ture()

}

While(WorkNotDone) {=

     &= nbsp;          doSomeWorkOnStr= ucture()

     &= nbsp;          emitPartialResu= lt();

}

 

Instead of emitting the partial= result as an ArrayList<T> ("not a valid POJO type") I do n= ow iterate through this ArrayList<T> and emit each Tuple as Tuple2.of(Integer.valueOf(th= is.partitionIndex), tuple)));

While iterating through this Ar= rayList and emitting tuples, my mapper seems to be blocked and can’t = continue to doSomeWorkOnStructure().

 

So I have three questions:=

-=    &n= bsp;      If I change back to emi= tting the ArrayList<T> would my Mapper also be blocked until Flink ha= s serialized this ArrayList<T>? Or is Serialization done independent = from my Mapper?

 

-=    &n= bsp;      If emitting the ArrayLi= st<T> won’t block my Mapper, which variant would be more perfor= mant?

 

-=    &n= bsp;      If I emit ArrayList<= T>, but additionally implement a combiner, which

o   Merges all local ArrayL= ists<T> with the same partitionIndex

o   Iterates through the lo= cal-merged ArrayLists<T> and emits the containing tuples

wo= uld that be the best variant? Because the combining is done locally, I woul= d assume that no Serialization is required between Mapper and Combiner. Als= o, the Mapper is probably not blocked with emitting tuples and can continue doSomeWorkOnStructure()

 

Thank you in advance!

Robert

 

 

--_000_3ee1ba9e5f1040bd88519469ab48522cEXMBX01tubitwintuberlin_--