Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CB76F18E07 for ; Tue, 19 Jan 2016 12:33:34 +0000 (UTC) Received: (qmail 66712 invoked by uid 500); 19 Jan 2016 12:33:34 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 66627 invoked by uid 500); 19 Jan 2016 12:33:34 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 66617 invoked by uid 99); 19 Jan 2016 12:33:34 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 19 Jan 2016 12:33:34 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 1A7D71A065A for ; Tue, 19 Jan 2016 12:33:34 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1 X-Spam-Level: * X-Spam-Status: No, score=1 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id I6uh3jdwziag for ; Tue, 19 Jan 2016 12:33:32 +0000 (UTC) Received: from mx1.mailbox.org (mx1.mailbox.org [80.241.60.212]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with ESMTPS id 7E9E62054B for ; Tue, 19 Jan 2016 12:33:32 +0000 (UTC) Received: from smtp1.mailbox.org (smtp1.mailbox.org [80.241.60.240]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by mx1.mailbox.org (Postfix) with ESMTPS id 9866043A3B for ; Tue, 19 Jan 2016 13:33:30 +0100 (CET) X-Virus-Scanned: amavisd-new at heinlein-support.de Received: from smtp1.mailbox.org ([80.241.60.240]) by gerste.heinlein-support.de (gerste.heinlein-support.de [91.198.250.173]) (amavisd-new, port 10030) with ESMTP id gmqd9W5eo0Zd for ; Tue, 19 Jan 2016 13:33:29 +0100 (CET) Subject: Re: Flink Stream: collect in an array all records within a window To: user@flink.apache.org References: <569D412B.2070904@apache.org> <569E0336.6000804@apache.org> From: "Matthias J. Sax" Message-ID: <569E2CE1.3080001@apache.org> Date: Tue, 19 Jan 2016 13:32:33 +0100 User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:38.0) Gecko/20100101 Icedove/38.5.0 MIME-Version: 1.0 In-Reply-To: Content-Type: multipart/signed; micalg=pgp-sha256; protocol="application/pgp-signature"; boundary="2Rv4UK0vRwrTPtGNXlrrep5RqVkmrMHih" This is an OpenPGP/MIME signed message (RFC 4880 and 3156) --2Rv4UK0vRwrTPtGNXlrrep5RqVkmrMHih Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable What type is your DataStream? It must be DataStream[String] to work with SimpleStringSchema. If you have a different type, just implement a customized SerializationSchema. -Matthias On 01/19/2016 11:26 AM, Saiph Kappa wrote: > When I use SimpleStringSchema I get the error: Type mismatch, expected:= > SerializationSchema[String, Array[Byte]], actual: SimpleStringSchema. I= > think SimpleStringSchema extends SerializationSchema[String], and > therefore cannot be used as argument of writeToSocket. Can you confirm > this please? >=20 > s.writeToSocket(host, port.toInt, new SimpleStringSchema()) >=20 >=20 > Thanks. >=20 > On Tue, Jan 19, 2016 at 10:34 AM, Matthias J. Sax > wrote: >=20 > There is SimpleStringSchema. >=20 > -Matthias >=20 > On 01/18/2016 11:21 PM, Saiph Kappa wrote: > > Hi Matthias, > > > > Thanks for your response. The method .writeToSocket seems to be w= hat I > > was looking for. Can you tell me what kind of serialization schem= a > > should I use assuming my socket server receives strings. I have > > something like this in scala: > > > > |val server =3DnewServerSocket(9999)while(true){val s =3Dserver.a= ccept()val > > in=3DnewBufferedSource(s.getInputStream()).getLines()println(in.n= ext())s.close()} > > > > | > > > > Thanks| > > | > > > > > > > > On Mon, Jan 18, 2016 at 8:46 PM, Matthias J. Sax > > >> wrote: > > > > Hi Saiph, > > > > you can use AllWindowFunction via .apply(...) to get an > .collect method: > > > > From: > > =20 > https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/= streaming_guide.html > > > > > // applying an AllWindowFunction on non-keyed window stream= > > > allWindowedStream.apply (new > > AllWindowFunction, Integer, Window>() = { > > > public void apply (Window window, > > > Iterable> values, > > > Collector out) throws Exception { > > > int sum =3D 0; > > > for (value t: values) { > > > sum +=3D t.f1; > > > } > > > out.collect (new Integer(sum)); > > > } > > > }); > > > > If you consume all those value via an sink, the sink will run= > an the > > cluster. You can use .writeToSocket(...) as sink: > > =20 > https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/= streaming_guide.html#data-sinks > > > > -Matthias > > > > > > On 01/18/2016 06:30 PM, Saiph Kappa wrote: > > > Hi, > > > > > > After performing a windowAll() on a DataStream[String], is > there any > > > method to collect and return an array with all Strings > within a window > > > (similar to .collect in Spark). > > > > > > I basically want to ship all strings in a window to a remot= e > server > > > through a socket, and want to use the same socket connectio= n > for all > > > strings that I send. The method .addSink iterates over all > > records, but > > > does the provided function runs on the flink client or on > the server? > > > > > > Thanks. > > > > >=20 >=20 --2Rv4UK0vRwrTPtGNXlrrep5RqVkmrMHih Content-Type: application/pgp-signature; name="signature.asc" Content-Description: OpenPGP digital signature Content-Disposition: attachment; filename="signature.asc" -----BEGIN PGP SIGNATURE----- Version: GnuPG v2 iQIcBAEBCAAGBQJWnizhAAoJEFCVK48prEZ4NScP/3y/cYDFx9gBUwjS0JI+g5gC jsHUxN+cFbIKV8XhmdFeTKTpphQIPK/1mwN9DZ3phoC4A3yB3FBcO6xq5YEuOgBW kg9b3uF0NP2oI1cfAyGLUe5qAk5IXCRBQIoAlQpdrQrsSGrbYBiq0ZuDLMjXr4Aq Yv6rPv9O5qDchWRHRyiGzlHMdDYD5D6YGfksBfchTKd05VTelUSNM5QrmHsT6t8l 78dYV3WPiw5JaV69ezLd9C8XKKbHofOFvIJu4vb6LHVf8IqrXvfkBosHaNN8grmk nSIT0+VQEoWHOxdRPKnfBU/F5DPbB/CDJaWaMW4Vsjc+gd0Tgd/gT3A2DRenu0dg qC+z/aLSP6/PcmEwjwGB8vQYgYKkM7rQYHqMnWR7IqFht31vI9Cpe50ATTGgjgEK CYa7sa0YeHmrvxt9voHBEQO5Kze3napQ9T5l8zDq8+floJojzThc8hr6PLrJwbt4 Pl9KEWJ920DsYyRNWsSchOL/4jUSvGH8eqnhBbMcc+Pqexhme9jeERLjvV6YmLgD +yrddfr8ViGTuo3StPMA6CeZYHrTpRYODLXRgFztISzrzYkijenSTumJJmj09OmD esnSM1zz3dzRn5IH+Lg8hQe1sYWXIuvh5utTb5mYwcDwddM1C6LckvDAQkrkxGAO 69pZM/+Jnf5xfVwYl36C =molM -----END PGP SIGNATURE----- --2Rv4UK0vRwrTPtGNXlrrep5RqVkmrMHih--