Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7CD3617F0D for ; Wed, 21 Oct 2015 13:29:46 +0000 (UTC) Received: (qmail 83204 invoked by uid 500); 21 Oct 2015 13:29:33 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 83132 invoked by uid 500); 21 Oct 2015 13:29:33 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 83123 invoked by uid 99); 21 Oct 2015 13:29:33 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 21 Oct 2015 13:29:33 +0000 Received: from mail-wi0-f179.google.com (mail-wi0-f179.google.com [209.85.212.179]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 179AB1A0181 for ; Wed, 21 Oct 2015 13:29:33 +0000 (UTC) Received: by wicfx6 with SMTP id fx6so91414340wic.1 for ; Wed, 21 Oct 2015 06:29:31 -0700 (PDT) MIME-Version: 1.0 X-Received: by 10.180.24.42 with SMTP id r10mr8783632wif.46.1445434171973; Wed, 21 Oct 2015 06:29:31 -0700 (PDT) Received: by 10.28.139.69 with HTTP; Wed, 21 Oct 2015 06:29:31 -0700 (PDT) In-Reply-To: <1445432540789-3196.post@n4.nabble.com> References: <1445269349187-3169.post@n4.nabble.com> <1445432540789-3196.post@n4.nabble.com> Date: Wed, 21 Oct 2015 15:29:31 +0200 Message-ID: Subject: Re: Flink Data Stream Union From: Till Rohrmann To: user@flink.apache.org Content-Type: multipart/alternative; boundary=f46d043c807a5df11a05229d5e69 --f46d043c807a5df11a05229d5e69 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Can it be that you forgot to call unionMessageStreams in your main method? Cheers, Till =E2=80=8B On Wed, Oct 21, 2015 at 3:02 PM, flinkuser wrote: > Here is the strange behavior. > > Below code works in one box but not in the other. I had it working in my > laptop the whole of yesterday, but strangely today it doesnt work in my > desktop. > > Can anyone please let me know what the issue is. > > > public static void main(String[] args) throws Exception { > try { > final StreamExecutionEnvironment env =3D > StreamExecutionEnvironment.getExecutionEnvironment(); > DataStream msgDataStream1 =3D > env.addSource((new > SocketSource(hostName1, port, '\n', -1))).filter(new > MessageFilter()).setParallelism(1); > DataStream msgDataStream2 =3D > env.addSource((new > SocketSource(hostName2, port, '\n', -1))).filter(new > MessageFilter()).setParallelism(1); > > env.execute("Stock stream"); > > } catch (Exception e) { > System.err.println("Exception =3D > " + > e.getMessage()); > e.printStackTrace(); > } > } > > private static void unionMessageStreams(DataStream > msgDataStream1, > DataStream msgDataStream2) { > try { > > DataStream ds =3D > msgDataStream1.union(msgDataStream2); > ds.print(); > } catch (Exception e) { > System.err.println("Exception in union Message > Streams () =3D > " + > e.getMessage()); > } > } > > Thanks > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink= -Data-Stream-Union-tp3169p3196.html > Sent from the Apache Flink User Mailing List archive. mailing list archiv= e > at Nabble.com. > --f46d043c807a5df11a05229d5e69 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable

Can it be that you forgot to call unionMessageStreams in your main= method?

Cheers,
Till

=E2=80=8B

On Wed, Oct 21, 2015 = at 3:02 PM, flinkuser <gaayuu@gmail.com> wrote:
Here is the strange behavior.

Below code works in one box but not in the other. I had it working in my laptop the whole of yesterday, but strangely today it doesnt work in my
desktop.

Can anyone please let me know what the issue is.


public static void main(String[] args) throws Exception {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 try {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 final StreamExecutionEnvironment env =3D
StreamExecutionEnvironment.getExecutionEnvironment();
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 DataStream<String> msgDataStream1 =3D env= .addSource((new
SocketSource(hostName1, port, '\n', -1))).filter(new
MessageFilter()).setParallelism(1);
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 DataStream<String> msgDataStream2 =3D env.addSource((new SocketSource(hostName2, port, '\n', -1))).filter(new
MessageFilter()).setParallelism(1);

=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 env.execute("Stock stream");

=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 } catch (Exception = e) {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 System.err.println("Exception=C2=A0 =3D > " + e.get= Message());
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 e.printStackTrace();
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 }
=C2=A0 =C2=A0 =C2=A0 =C2=A0 }

=C2=A0 =C2=A0 =C2=A0 =C2=A0 private static void unionMessageStreams(DataStr= eam<String> msgDataStream1,
DataStream<String> msgDataStream2) {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 try {

=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 DataStream<String> ds =3D msgDataStream1.union(msgDataStre= am2);
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 ds.print();
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 } catch (Exception = e) {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 System.err.println("Exception in union Message Streams () = =3D > " +
e.getMessage());
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 }
=C2=A0 =C2=A0 =C2=A0 =C2=A0 }

Thanks



--
View this message in context: http://apache-flink-user-mailing-list= -archive.2336050.n4.nabble.com/Flink-Data-Stream-Union-tp3169p3196.html=
Sent from the Apache Flink User Mai= ling List archive. mailing list archive at Nabble.com.

--f46d043c807a5df11a05229d5e69--