flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljoscha.kret...@gmail.com>
Subject Re: Flink Data Stream Union
Date Wed, 21 Oct 2015 14:17:45 GMT
So it is received in the filter but the print afterwards does not print?
> On 21 Oct 2015, at 15:49, Gayu <gaayuu@gmail.com> wrote:
> 
> The data does arrive in the second port and i am able to see that in the filter class
received.
> It happens only on specific machine on which i run the code.
> 
> 
> Yes, i did forget to post here, but my program calls the unionMessageStreams()
> 
> On Wed, Oct 21, 2015 at 9:39 AM, Aljoscha Krettek <aljoscha.krettek@gmail.com>
wrote:
> Hi Gayu,
> could it be that no data ever arrives on the second input stream? Or that the filter
filters out all messages?
> 
> Also, in the example you posted you forgot to call unionMessageStreams().
> 
> Cheers,
> Aljoscha
> 
> > On 21 Oct 2015, at 15:29, Till Rohrmann <trohrmann@apache.org> wrote:
> >
> > Can it be that you forgot to call unionMessageStreams in your main method?
> >
> > Cheers,
> > Till
> >
> >
> > On Wed, Oct 21, 2015 at 3:02 PM, flinkuser <gaayuu@gmail.com> wrote:
> > Here is the strange behavior.
> >
> > Below code works in one box but not in the other. I had it working in my
> > laptop the whole of yesterday, but strangely today it doesnt work in my
> > desktop.
> >
> > Can anyone please let me know what the issue is.
> >
> >
> > public static void main(String[] args) throws Exception {
> >                 try {
> >                         final StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> >                         DataStream<String> msgDataStream1 = env.addSource((new
> > SocketSource(hostName1, port, '\n', -1))).filter(new
> > MessageFilter()).setParallelism(1);
> >                         DataStream<String> msgDataStream2 = env.addSource((new
> > SocketSource(hostName2, port, '\n', -1))).filter(new
> > MessageFilter()).setParallelism(1);
> >
> >                         env.execute("Stock stream");
> >
> >                 } catch (Exception e) {
> >                         System.err.println("Exception  = > " + e.getMessage());
> >                         e.printStackTrace();
> >                 }
> >         }
> >
> >         private static void unionMessageStreams(DataStream<String> msgDataStream1,
> > DataStream<String> msgDataStream2) {
> >                 try {
> >
> >                         DataStream<String> ds = msgDataStream1.union(msgDataStream2);
> >                         ds.print();
> >                 } catch (Exception e) {
> >                         System.err.println("Exception in union Message Streams ()
= > " +
> > e.getMessage());
> >                 }
> >         }
> >
> > Thanks
> >
> >
> >
> > --
> > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Data-Stream-Union-tp3169p3196.html
> > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
> >
> 
> 
> 
> 
> -- 
> Gayu


Mime
View raw message