Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A9A82188CD for ; Wed, 21 Oct 2015 16:47:53 +0000 (UTC) Received: (qmail 12919 invoked by uid 500); 21 Oct 2015 16:47:53 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 12848 invoked by uid 500); 21 Oct 2015 16:47:53 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 12838 invoked by uid 99); 21 Oct 2015 16:47:53 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 21 Oct 2015 16:47:53 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 03BF21A22D6 for ; Wed, 21 Oct 2015 16:47:53 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 4.301 X-Spam-Level: **** X-Spam-Status: No, score=4.301 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HEADER_FROM_DIFFERENT_DOMAINS=0.008, HTML_MESSAGE=3, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001, URIBL_BLOCKED=0.001, URI_HEX=1.313] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id u2amYXyqOq-z for ; Wed, 21 Oct 2015 16:47:48 +0000 (UTC) Received: from mail-qk0-f181.google.com (mail-qk0-f181.google.com [209.85.220.181]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with ESMTPS id B482E20FE0 for ; Wed, 21 Oct 2015 16:47:47 +0000 (UTC) Received: by qkbl190 with SMTP id l190so41575290qkb.2 for ; Wed, 21 Oct 2015 09:47:47 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:sender:in-reply-to:references:date:message-id:subject :from:to:content-type; bh=hgBJ0a7Hik41EMGoDHDxCB5OWQ0S9JhOUvROtfJzoLU=; b=rv/39XxKEf/jKC7kMQE8bF6fzJO1g+s7lwqeJSNJlutFQCjdDQuL7Ax5KGZbycpkde fhH4CrapKgJav8pYFi5fmGTXcriGnBvFNjQCEjG1CxTfsDxWDhz5psKaUjQhvpiGmdcS mrO4Ljkt5RspedouIcZt/Na6tfVB5LI3IR2K7B+kbus1YMx0k+43QQaCWySmehxlgPKv HUUHmSPPixRJjEj5p1nduJgeZchMP0UOE+CY2n5Wr75T79xbCdMrPOVIcty/+Dz1ocnJ mxEps215iFOjrF33x7dI3byc3PAxAPe5EITXBRbd8Us9HJyrI0iQBwxeQUJyNOV3Va7T pCQg== MIME-Version: 1.0 X-Received: by 10.55.198.212 with SMTP id s81mr12321927qkl.108.1445446066919; Wed, 21 Oct 2015 09:47:46 -0700 (PDT) Sender: ewenstephan@gmail.com Received: by 10.55.5.21 with HTTP; Wed, 21 Oct 2015 09:47:46 -0700 (PDT) In-Reply-To: References: <1445269349187-3169.post@n4.nabble.com> <1445432540789-3196.post@n4.nabble.com> Date: Wed, 21 Oct 2015 18:47:46 +0200 X-Google-Sender-Auth: as9VKB5Udaun8XDgXiYQFlSehXk Message-ID: Subject: Re: Flink Data Stream Union From: Stephan Ewen To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a11455e925c5ef70522a0231d --001a11455e925c5ef70522a0231d Content-Type: text/plain; charset=UTF-8 I think the most crucial question is still whether you are running 0.9.1 or 0.10-SNAPSHOT, because the 0.9.1 union has known issues... If you are running 0.9.1 there is not much you can do except upgrade the version ;-) On Wed, Oct 21, 2015 at 5:19 PM, Aljoscha Krettek wrote: > Hi, > first of all, am I correct to assume that > new SocketSource(hostName1, port, '\n', -1) > should be > new SocketTextStreamFunction(hostName1, port1, '\n', -1) > > or are you using a custom built SocketSource for this? > > If I replace it by SocketTextStreamFunction and execute it the example > runs and prints incoming Strings from both input sockets. > > How are you executing the example? In the IDE or on a Flink cluster? > > Cheers, > Aljoscha > > On 21 Oct 2015, at 15:02, flinkuser wrote: > > > > Here is the strange behavior. > > > > Below code works in one box but not in the other. I had it working in my > > laptop the whole of yesterday, but strangely today it doesnt work in my > > desktop. > > > > Can anyone please let me know what the issue is. > > > > > > public static void main(String[] args) throws Exception { > > try { > > final StreamExecutionEnvironment env = > > StreamExecutionEnvironment.getExecutionEnvironment(); > > DataStream msgDataStream1 = > env.addSource((new > > SocketSource(hostName1, port, '\n', -1))).filter(new > > MessageFilter()).setParallelism(1); > > DataStream msgDataStream2 = > env.addSource((new > > SocketSource(hostName2, port, '\n', -1))).filter(new > > MessageFilter()).setParallelism(1); > > > > env.execute("Stock stream"); > > > > } catch (Exception e) { > > System.err.println("Exception = > " + > e.getMessage()); > > e.printStackTrace(); > > } > > } > > > > private static void unionMessageStreams(DataStream > msgDataStream1, > > DataStream msgDataStream2) { > > try { > > > > DataStream ds = > msgDataStream1.union(msgDataStream2); > > ds.print(); > > } catch (Exception e) { > > System.err.println("Exception in union Message > Streams () = > " + > > e.getMessage()); > > } > > } > > > > Thanks > > > > > > > > -- > > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Data-Stream-Union-tp3169p3196.html > > Sent from the Apache Flink User Mailing List archive. mailing list > archive at Nabble.com. > > --001a11455e925c5ef70522a0231d Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
I think the most crucial question is still whether you are= running 0.9.1 or 0.10-SNAPSHOT, because the 0.9.1 union has known issues..= .
If you are running 0.9.1 there is not much you can do except upgrade = the version ;-)

On Wed, Oct 21, 2015 at 5:19 PM, Aljoscha Krettek <aljoscha@ap= ache.org> wrote:
Hi,
first of all, am I correct to assume that
new SocketSource(hostName1, port, '\n', -1)
should be
new SocketTextStreamFunction(hostName1, port1, '\n', -1)

or are you using a custom built SocketSource for this?

If I replace it by SocketTextStreamFunction and execute it the example runs= and prints incoming Strings from both input sockets.

How are you executing the example? In the IDE or on a Flink cluster?

Cheers,
Aljoscha
> On 21 Oct 2015, at 15:02, flin= kuser <gaayuu@gmail.com> wrot= e:
>
> Here is the strange behavior.
>
> Below code works in one box but not in the other. I had it working in = my
> laptop the whole of yesterday, but strangely today it doesnt work in m= y
> desktop.
>
> Can anyone please let me know what the issue is.
>
>
> public static void main(String[] args) throws Exception {
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0try {
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0final StreamExecutionEnvironment env =3D
> StreamExecutionEnvironment.getExecutionEnvironment();
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0DataStream<String> msgDataStream1 =3D env.addSource((new=
> SocketSource(hostName1, port, '\n', -1))).filter(new
> MessageFilter()).setParallelism(1);
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0DataStream<String> msgDataStream2 =3D env.addSource((new=
> SocketSource(hostName2, port, '\n', -1))).filter(new
> MessageFilter()).setParallelism(1);
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0env.execute("Stock stream");
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0} catch (Excepti= on e) {
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0System.err.println("Exception=C2=A0 =3D > " + e.g= etMessage());
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0e.printStackTrace();
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0}
>=C2=A0 =C2=A0 =C2=A0 =C2=A0}
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0private static void unionMessageStreams(Data= Stream<String> msgDataStream1,
> DataStream<String> msgDataStream2) {
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0try {
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0DataStream<String> ds =3D msgDataStream1.union(msgDataSt= ream2);
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0ds.print();
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0} catch (Excepti= on e) {
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0System.err.println("Exception in union Message Streams ()= =3D > " +
> e.getMessage());
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0}
>=C2=A0 =C2=A0 =C2=A0 =C2=A0}
>
> Thanks
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing= -list-archive.2336050.n4.nabble.com/Flink-Data-Stream-Union-tp3169p3196.htm= l
> Sent from the Apache Flink User Mailing List archive. mailing list arc= hive at Nabble.com.


--001a11455e925c5ef70522a0231d--