Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9FE80200B4A for ; Wed, 20 Jul 2016 15:38:35 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 9E57E160A64; Wed, 20 Jul 2016 13:38:35 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4BE0C160A5B for ; Wed, 20 Jul 2016 15:38:34 +0200 (CEST) Received: (qmail 67329 invoked by uid 500); 20 Jul 2016 13:38:33 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 67319 invoked by uid 99); 20 Jul 2016 13:38:33 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 Jul 2016 13:38:33 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id E4E96C0B66 for ; Wed, 20 Jul 2016 13:38:32 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.208 X-Spam-Level: * X-Spam-Status: No, score=1.208 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H2=-0.001, SPF_PASS=-0.001, T_REMOTE_IMAGE=0.01] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id DLne-sWDAWmg for ; Wed, 20 Jul 2016 13:38:28 +0000 (UTC) Received: from mail-qk0-f175.google.com (mail-qk0-f175.google.com [209.85.220.175]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 32EF560E5F for ; Wed, 20 Jul 2016 13:38:28 +0000 (UTC) Received: by mail-qk0-f175.google.com with SMTP id x1so45047556qkb.3 for ; Wed, 20 Jul 2016 06:38:28 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=zV31BvlxKkkpJGjPRqzQ2r+bgDjSeIM/g/Y1oDhCYDM=; b=LV8/xs1PeIPrAHhZAl3F9H99EGzGPOSoqhETknEAmrY0z4N1Kd+OjzQEOdppEzoArI NYR1l5Kyd6hOKNtfiuh/nESD4CkpuyB6K4pETiO7ye94IKg9SFGllDNMUsOp3sKM3ux6 efQvlasgWPj/UZCAMMpxyxDsa8z2Gvnrt1n4xeF2e1JKREXJBgwb09fJ1t7TVDcpZ9E3 R+bYycgBksfRzS8iI2hQUp+IN1obOTCPjsCP43VnGEi4TIPRqJn8JvcN+cecOw7hqdpp +F+2fnheNcRM52L0IfYLWhg/On/lUMG7wREKHrbxlaAHg905v/lYj55J4OSGPaXLGtuG V8Nw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=zV31BvlxKkkpJGjPRqzQ2r+bgDjSeIM/g/Y1oDhCYDM=; b=fCk9S4cyd9MZlWY65+UwaWpQO5W9jiqywOOe8E3zFcA51YLfkwGeYaWp1JDo55dSAk DMfyj0zw+euH2xOy3Wh+iwEHRHwQ6E9fgbUzOJ1uL5Sfy32mYXKVCR14wRPkImiYOYd1 RACARGMv2xILSr1EMdhj0ILtpTjVPEptvLp/z/os0dxjZymaOmRKNFXVJ2KdMvdep/1m RBIWtJqbX4FJ6sKnppGo4eoeUdgRV7CZPNfQCmqpJ4v8tLMAOYwDSEugJRemdptfODz/ kZpxNldE2RfdKowUB+7F69MXx4gEWzGscNvTn6cZI4Z0D+rrcH4wWo8QmHRyNmaarZsW 2iFg== X-Gm-Message-State: ALyK8tJ78S42vdWRmrMQgMsqr3qL/kQht7jQ8DDgIhhuABZrNcC08XhVUjnYBzR1nkmwK0GoqQmC0zazEsyG3w== X-Received: by 10.55.126.7 with SMTP id z7mr58672412qkc.172.1469021906991; Wed, 20 Jul 2016 06:38:26 -0700 (PDT) MIME-Version: 1.0 Received: by 10.55.78.79 with HTTP; Wed, 20 Jul 2016 06:38:26 -0700 (PDT) In-Reply-To: <1469016351813.1425350037@boxbe> References: <1469010073507.1425350037@boxbe> <1469016351813.1425350037@boxbe> From: subash basnet Date: Wed, 20 Jul 2016 15:38:26 +0200 Message-ID: Subject: Re: DataStreamUtils not working properly To: user@flink.apache.org Content-Type: multipart/alternative; boundary=94eb2c063b7aef04ed0538115071 archived-at: Wed, 20 Jul 2016 13:38:35 -0000 --94eb2c063b7aef04ed0538115071 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable hello maximilian, Thanks! I learned new thing today :). But my problem still exists. Your example has little data and it works fine. But in my datastream I have set timeWindow as Time.seconds(5). What I found out is, if I print as below as your example: Iterator iter =3D DataStreamUtils.collect(centroids); List testCentroids =3D new ArrayList(); while (iter.hasNext()) { System.out.println(iter.next()); } It prints the result in a streaming manner. But now if I collect in arrayList and print as below: Iterator iter =3D DataStreamUtils.collect(centroids); List testCentroids =3D new ArrayList(); while (iter.hasNext()) { testCentroids.add(iter.next()); } for(Centroid centroid: testCentroids){ System.out.println(centroid); } It waits for all the time, till all the stream get's collected in the arrayList I guess, and prints all the values in the arraylist finally. I had just waited for roughly around 2 minutes, found out that arraylist got printed and the program ended automatically after the print of the arraylist along with some exception messages. Why is this arraylist collection waiting till a huge collection of multiple input stream of centroids gets printed at once. What could be the issue. And it printed the following exceptions also along with all the items in the arraylist: Exception in thread "Thread-1" java.lang.RuntimeException: Exception in execute() at org.apache.flink.contrib.streaming.DataStreamUtils$CallExecute.run(DataStre= amUtils.java:82) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. Exception in thread "main" java.lang.IllegalStateException:* No operators defined in streaming topology. Cannot execute.* * at * org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getSt= reamGraph(StreamExecutionEnvironment.java:1195) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(L= ocalStreamEnvironment.java:86) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execu= te(StreamExecutionEnvironment.java:1170) Regards, Subash Basnet Best Regards, Subash Basnet On Wed, Jul 20, 2016 at 2:05 PM, Maximilian Michels wrote: > [image: Boxbe] This message is eligible > for Automatic Cleanup! (mxm@apache.org) Add cleanup rule > > | More info > > > Ah, now I see where the problem lies. You're reusing the Iterator > which you have already used in the for loop. You can only iterate over > the elements once! This is the nature of the Java Iterator and > DataStreamUtils.collect(..) returns an iterator. > > On Wed, Jul 20, 2016 at 1:11 PM, subash basnet wrote= : > > > > Hello Maximilian, > > > > Thank's for the update. Yup it works in the example you gave. I checked > with collection also it works. But not in my datastream case after the > collection. > > DataStream centroids =3D newCentroidDataStream.map(new > TupleCentroidConverter()); > > Iterator iter =3D DataStreamUtils.collect(centroids); > > while (iter.hasNext()) { > > System.out.println(iter.next()); > > } > > Collection testCentroids =3D Lists.newArrayList(iter); > > for (Centroid c : testCentroids) { > > System.out.println(c); > > } > > > > In the above code the while loop prints the result as below, but the > next for loop after the collection gives blank. > > > > Tue Jul 19 15:49:00 CEST 2016 118.7 118.81 118.7 118.77 76300.0 > > Tue Jul 19 15:47:02 CEST 2016 118.85 118.885 118.8 118.84 75600.0 > > Tue Jul 19 15:46:00 CEST 2016 118.8627 118.93 118.79 118.8 76300.0 > > Tue Jul 19 15:45:59 CEST 2016 118.8 118.94 118.77 118.9 106800.0 > > > > Not sure, what is the problem, as after collection it gives blank resul= t > in my case but works in the example you gave. Below is my > newCentroidDataStream: > > > > @SuppressWarnings("serial") > > DataStream> newCentroidDataStream =3D > keyedEdits.timeWindow(Time.seconds(1)) > > .fold(new Tuple2<>("", columns1), new FoldFunction Double[]>>() { > > @Override > > public Tuple2 fold(Tuple2 st, Stock > value) { > > Double[] columns =3D new Double[5];// close,high,low,open,volume > > columns[0] =3D value.getClose(); > > columns[1] =3D value.getHigh(); > > columns[2] =3D value.getLow(); > > columns[3] =3D value.getOpen(); > > columns[4] =3D (double) value.getVolume(); > > return (new Tuple2(value.getId(), columns)); > > } > > }); > > > > Regards, > > Subash Basnet > > > > On Wed, Jul 20, 2016 at 12:20 PM, Maximilian Michels > wrote: > >> > >> This message is eligible for Automatic Cleanup! (mxm@apache.org) Add > cleanup rule | More info > >> > >> Just tried the following and it worked: > >> > >> public static void main(String[] args) throws IOException { > >> StreamExecutionEnvironment env =3D > StreamExecutionEnvironment.getExecutionEnvironment(); > >> > >> final DataStreamSource source =3D env.fromElements(1, 2, 3= , > 4); > >> source.print(); > >> > >> final Iterator iter =3D DataStreamUtils.collect(source); > >> while (iter.hasNext()) { > >> System.out.println(iter.next()); > >> } > >> } > >> > >> It prints: > >> > >> 1 > >> 2 > >> 3 > >> 4 > >> 2> 2 > >> 1> 1 > >> 4> 4 > >> 3> 3 > >> > >> However, the collect util needs some improvements. It assumes that the > machine running the code is reachable on a random port by the Flink > cluster. If you have any firewalls, then this might not work. > >> > >> Cheers, > >> Max > >> > >> On Tue, Jul 19, 2016 at 10:13 PM, subash basnet > wrote: > >>> > >>> Hello Till, > >>> > >>> Yup I can see the log output in my console, but there is no > information there regarding if there is any error in conversion. Just > normal warn and info as below: > >>> 22:09:16,676 WARN > org.apache.flink.streaming.runtime.tasks.StreamTask - No state > backend has been specified, using default state backend (Memory / > JobManager) > >>> 22:09:16,676 INFO > org.apache.flink.streaming.runtime.tasks.StreamTask - State > backend is set to heap memory (checkpoint to jobmanager) > >>> > >>> The above message is always there when I run my project. It would be > great if someone could check why the collection of datastream via > DataStreamUtils is giving empty result. > >>> > >>> Best Regards, > >>> Subash Basnet > >>> > >>> On Tue, Jul 19, 2016 at 4:52 PM, Till Rohrmann > wrote: > >>>> > >>>> It depends if you have a log4j.properties file specified in your > classpath. If you see log output on the console, then it should also prin= t > errors there. > >>>> > >>>> Cheers, > >>>> Till > >>>> > >>>> On Tue, Jul 19, 2016 at 3:08 PM, subash basnet > wrote: > >>>>> > >>>>> Hello Till, > >>>>> > >>>>> Shouldn't it write something in the eclipse console if there is any > error or warning. But nothing about error is printed on the console. And = I > checked the flink project folder: flink-core, flink streaming as such but > couldn't find where the log is written when run via eclipse. > >>>>> > >>>>> Best Regards, > >>>>> Subash Basnet > >>>>> > >>>>> On Tue, Jul 19, 2016 at 2:49 PM, Till Rohrmann > wrote: > >>>>>> > >>>>>> Have you checked your logs whether they contain some problems? In > general it is not recommended collecting the streaming result back to you= r > client. It might also be a problem with `DataStreamUtils.collect`. > >>>>>> > >>>>>> Cheers, > >>>>>> Till > >>>>>> > >>>>>> On Tue, Jul 19, 2016 at 2:42 PM, subash basnet > wrote: > >>>>>>> > >>>>>>> Hello all, > >>>>>>> > >>>>>>> I tried to check if it works for tuple but same problem, the > collection still shows blank result. I took the id of centroid tuple and > printed it, but the collection displays empty. > >>>>>>> > >>>>>>> DataStream centroids =3D newCentroidDataStream.map(new > TupleCentroidConverter()); > >>>>>>> DataStream> centroidId =3D centroids.map(new > TestMethod()); > >>>>>>> centroidId.print(); > >>>>>>> Iterator> iter =3D > DataStreamUtils.collect(centroidId); > >>>>>>> Collection> testCentroids =3D > Lists.newArrayList(iter); > >>>>>>> for (Tuple1 c : testCentroids) { > >>>>>>> System.out.println(c); > >>>>>>> } > >>>>>>> Output: (Mon Jul 18 17:36:03 CEST 2016) (Mon Jul 18 17:43:58 CEST > 2016) (Mon Jul 18 17:42:59 CEST 2016) (Mon Jul 18 17:34:01 CEST 2016) (Mo= n > Jul 18 17:52:00 CEST 2016) (Mon Jul 18 17:40:58 CEST 2016) for > centroidId.print(), but no output for System.out.println(c); Best Regards= , > Subash Basnet > >>>>>>> > >>>>>>> On Tue, Jul 19, 2016 at 10:48 AM, subash basnet < > yasubash@gmail.com> wrote: > >>>>>>>> > >>>>>>>> Hello all, > >>>>>>>> > >>>>>>>> I am trying to convert datastream to collection, but it's shows > blank result. There is a stream of data which can be viewed on the consol= e > on print(), but the collection of the same stream shows empty after > conversion. Below is the code: > >>>>>>>> > >>>>>>>> DataStream centroids =3D newCentroidDataStream.map(new > TupleCentroidConverter()); > >>>>>>>> centroids.print(); > >>>>>>>> Iterator iter =3D DataStreamUtils.collect(centroids); > >>>>>>>> Collection testCentroids =3D Lists.newArrayList(iter); > >>>>>>>> for(Centroid c: testCentroids){ > >>>>>>>> System.out.println(c); > >>>>>>>> } > >>>>>>>> > >>>>>>>> The above centroids.print() gives the following output in consol= e: > >>>>>>>> > >>>>>>>> Mon Jul 18 21:29:01 CEST 2016 119.3701 119.4 119.3701 119.38 > 27400.0 > >>>>>>>> Mon Jul 18 21:23:00 CEST 2016 119.3463 119.37 119.315 119.37 > 48200.0 > >>>>>>>> Mon Jul 18 21:27:59 CEST 2016 119.3401 119.3401 119.26 119.265 > 50300.0 > >>>>>>>> Mon Jul 18 21:36:00 CEST 2016 119.48 119.505 119.47 119.4741 > 37400.0 > >>>>>>>> Mon Jul 18 21:33:00 CEST 2016 119.535 119.54 119.445 119.455 > 152900.0 > >>>>>>>> > >>>>>>>> But the next System.out.println(c) within the for loop prints > nothing. What could be the problem. > >>>>>>>> > >>>>>>>> My maven has following configuration for dataStreamUtils: > >>>>>>>> > >>>>>>>> org.apache.flink > >>>>>>>> flink-streaming-contrib_2.10 > >>>>>>>> ${flink.version} > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> Best Regards, > >>>>>>>> Subash Basnet > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > >> > > > > --94eb2c063b7aef04ed0538115071 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
hello maximilian,

Thanks! I learned new= thing today :). But my problem still exists. Your example has little data = and it works fine. But in my datastream I have set timeWindow as=C2=A0Time.= seconds(5). What I found out is, if I print as below as your example:
Iterator<Centroid> iter =3D DataStreamUtils.collect(centroids)= ;
List<Centroid> testCentroids =3D new ArrayList<Centroi= d>();
=C2=A0 while (ite= r.hasNext()) {
System.out.println(iter.next());
}

It prints the result = in a streaming manner. But now if I collect in arrayList and print as below= :

Iterator<Centroid> iter =3D DataStreamUtils.collect(cen= troids);
List<Centroid> testCentroids =3D new ArrayList<= Centroid>();
=C2=A0 wh= ile (iter.hasNext()) {
testCentroids.add(iter.next());
=C2=A0 }
for(Centroid centroid: testCentroids){
System.out.prin= tln(centroid);
}

It waits for all the time, till all the s= tream get's collected in the arrayList I guess, and prints all the valu= es in the arraylist finally. I had just waited for roughly around 2 minutes= , found out that arraylist got printed and the program ended automatically = after the print of the arraylist along with some exception messages. Why is= this arraylist collection waiting till a huge collection of multiple input= stream of centroids gets printed at once. What could be the issue.=C2=A0And it printed the following exceptions also along with all the items in = the arraylist:

Exception in thread "Thread-1" java.la= ng.RuntimeException: Exception in execute()
at org.apache.flink.contrib.streaming.DataStr= eamUtils$CallExecute.run(DataStreamUtils.java:82)
Caused by: org.= apache.flink.runtime.client.JobExecutionException: Job execution failed.
Exception in thread "main" java.lang.IllegalStateExcept= ion: No operators defined in streaming topology. Cannot execute.
at org.apa= che.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGra= ph(StreamExecutionEnvironment.java:1195)
at org.apache.flink.streaming.api.environment.= LocalStreamEnvironment.execute(LocalStreamEnvironment.java:86)
at org.apache.flink.stream= ing.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvir= onment.java:1170)

Regards,
Subash Basnet


Best = Regards,
Subash Basnet


On Wed, Jul 20, 2016 at 2:05 PM, Maximi= lian Michels <mxm@apache.org> wrote:
3D"Boxbe" This message is eligible for Automatic Cleanup! (mxm@apache.org)=20 = Add cleanup rule | More info

Ah, now I see where the problem lies. You're reusing th= e Iterator
which you have already used in the for loop. You can only iterate over
the elements once! This is the nature of the Java Iterator and
DataStreamUtils.collect(..) returns an iterator.

On Wed, Jul 20, 2016 at 1:11 PM, subash basnet <yasubash@gmail.com> wrote:
>
> Hello Maximilian,
>
> Thank's for the update. Yup it works in the example you gave. I ch= ecked with collection also it works. But not in my datastream case after th= e collection.
> DataStream<Centroid> centroids =3D newCentroidDataStream.map(new= TupleCentroidConverter());
> Iterator<Centroid> iter =3D DataStreamUtils.collect(centroids);<= br> > while (iter.hasNext()) {
> System.out.println(iter.next());
> }
> Collection<Centroid> testCentroids =3D Lists.newArrayList(iter);=
> for (Centroid c : testCentroids) {
> System.out.println(c);
> }
>
> In the above code the while loop prints the result as below, but the n= ext for loop after the collection gives blank.
>
> Tue Jul 19 15:49:00 CEST 2016=C2=A0 118.7 118.81 118.7 118.77 76300.0<= br> > Tue Jul 19 15:47:02 CEST 2016=C2=A0 118.85 118.885 118.8 118.84 75600.= 0
> Tue Jul 19 15:46:00 CEST 2016=C2=A0 118.8627 118.93 118.79 118.8 76300= .0
> Tue Jul 19 15:45:59 CEST 2016=C2=A0 118.8 118.94 118.77 118.9 106800.0=
>
> Not sure, what is the problem, as after collection it gives blank resu= lt in my case but works in the example you gave. Below is my newCentroidDat= aStream:
>
> @SuppressWarnings("serial")
> DataStream<Tuple2<String, Double[]>> newCentroidDataStream= =3D keyedEdits.timeWindow(Time.seconds(1))
> .fold(new Tuple2<>("", columns1), new FoldFunction<= Stock, Tuple2<String, Double[]>>() {
> @Override
> public Tuple2<String, Double[]> fold(Tuple2<String, Double[]&= gt; st, Stock value) {
> Double[] columns =3D new Double[5];// close,high,low,open,volume
> columns[0] =3D value.getClose();
> columns[1] =3D value.getHigh();
> columns[2] =3D value.getLow();
> columns[3] =3D value.getOpen();
> columns[4] =3D (double) value.getVolume();
> return (new Tuple2<String, Double[]>(value.getId(), columns)); > }
> });
>
> Regards,
> Subash Basnet
>
> On Wed, Jul 20, 2016 at 12:20 PM, Maximilian Michels <mxm@apache.org> wrote:
>>
>> This message is eligible for Automatic Cleanup! (mxm@apache.org) Add cleanup rule | More info
>>
>> Just tried the following and it worked:
>>
>> public static void main(String[] args) throws IOException {
>>=C2=A0 =C2=A0 StreamExecutionEnvironment env =3D StreamExecutionEnv= ironment.getExecutionEnvironment();
>>
>>=C2=A0 =C2=A0 final DataStreamSource<Integer> source =3D env.= fromElements(1, 2, 3, 4);
>>=C2=A0 =C2=A0 source.print();
>>
>>=C2=A0 =C2=A0 final Iterator<Integer> iter =3D DataStreamUtil= s.collect(source);
>>=C2=A0 =C2=A0 while (iter.hasNext()) {
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0System.out.println(iter.next());
>>=C2=A0 =C2=A0 }
>> }
>>
>> It prints:
>>
>> 1
>> 2
>> 3
>> 4
>> 2> 2
>> 1> 1
>> 4> 4
>> 3> 3
>>
>> However, the collect util needs some improvements. It assumes that= the machine running the code is reachable on a random port by the Flink cl= uster. If you have any firewalls, then this might not work.
>>
>> Cheers,
>> Max
>>
>> On Tue, Jul 19, 2016 at 10:13 PM, subash basnet <yasubash@gmail.com> wrote:
>>>
>>> Hello Till,
>>>
>>> Yup I can see the log output in my console, but there is no in= formation there regarding if there is any error in conversion. Just normal = warn and info as below:
>>> 22:09:16,676 WARN=C2=A0 org.apache.flink.streaming.runtime.tas= ks.StreamTask=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0- No state backend ha= s been specified, using default state backend (Memory / JobManager)
>>> 22:09:16,676 INFO=C2=A0 org.apache.flink.streaming.runtime.tas= ks.StreamTask=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0- State backend is se= t to heap memory (checkpoint to jobmanager)
>>>
>>> The above message is always there when I run my project. It wo= uld be great if someone could check why the collection of datastream via Da= taStreamUtils is giving empty result.
>>>
>>> Best Regards,
>>> Subash Basnet
>>>
>>> On Tue, Jul 19, 2016 at 4:52 PM, Till Rohrmann <trohrmann@apache.org> wrote:
>>>>
>>>> It depends if you have a log4j.properties file specified i= n your classpath. If you see log output on the console, then it should also= print errors there.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Tue, Jul 19, 2016 at 3:08 PM, subash basnet <yasubash@gmail.com> wrote:
>>>>>
>>>>> Hello Till,
>>>>>
>>>>> Shouldn't it write something in the eclipse consol= e if there is any error or warning. But nothing about error is printed on t= he console. And I checked the flink project folder: flink-core, flink strea= ming as such but couldn't find where the log is written when run via ec= lipse.
>>>>>
>>>>> Best Regards,
>>>>> Subash Basnet
>>>>>
>>>>> On Tue, Jul 19, 2016 at 2:49 PM, Till Rohrmann <trohrmann@apache.org> wrote: >>>>>>
>>>>>> Have you checked your logs whether they contain so= me problems? In general it is not recommended collecting the streaming resu= lt back to your client. It might also be a problem with `DataStreamUtils.co= llect`.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Tue, Jul 19, 2016 at 2:42 PM, subash basnet <= ;yasubash@gmail.com> wrote: >>>>>>>
>>>>>>> Hello all,
>>>>>>>
>>>>>>> I tried to check if it works for tuple but sam= e problem, the collection still shows blank result. I took the id of centro= id tuple and printed it, but the collection displays empty.
>>>>>>>
>>>>>>> DataStream<Centroid> centroids =3D newCe= ntroidDataStream.map(new TupleCentroidConverter());
>>>>>>> DataStream<Tuple1<String>> centroi= dId =3D centroids.map(new TestMethod());
>>>>>>> centroidId.print();
>>>>>>> Iterator<Tuple1<String>> iter =3D = DataStreamUtils.collect(centroidId);
>>>>>>> Collection<Tuple1<String>> testCen= troids =3D Lists.newArrayList(iter);
>>>>>>> for (Tuple1<String> c : testCentroids) {=
>>>>>>> System.out.println(c);
>>>>>>> }
>>>>>>> Output: (Mon Jul 18 17:36:03 CEST 2016) (Mon J= ul 18 17:43:58 CEST 2016) (Mon Jul 18 17:42:59 CEST 2016) (Mon Jul 18 17:34= :01 CEST 2016) (Mon Jul 18 17:52:00 CEST 2016) (Mon Jul 18 17:40:58 CEST 20= 16) for centroidId.print(), but no output for System.out.println(c); Best R= egards, Subash Basnet
>>>>>>>
>>>>>>> On Tue, Jul 19, 2016 at 10:48 AM, subash basne= t <yasubash@gmail.com> wrot= e:
>>>>>>>>
>>>>>>>> Hello all,
>>>>>>>>
>>>>>>>> I am trying to convert datastream to colle= ction, but it's shows blank result. There is a stream of data which can= be viewed on the console on print(), but the collection of the same stream= shows empty after conversion. Below is the code:
>>>>>>>>
>>>>>>>> DataStream<Centroid> centroids =3D n= ewCentroidDataStream.map(new TupleCentroidConverter());
>>>>>>>> centroids.print();
>>>>>>>> Iterator<Centroid> iter =3D DataStre= amUtils.collect(centroids);
>>>>>>>> Collection<Centroid> testCentroids = =3D Lists.newArrayList(iter);
>>>>>>>> for(Centroid c: testCentroids){
>>>>>>>> System.out.println(c);
>>>>>>>> }
>>>>>>>>
>>>>>>>> The above centroids.print() gives the foll= owing output in console:
>>>>>>>>
>>>>>>>> Mon Jul 18 21:29:01 CEST 2016=C2=A0 119.37= 01 119.4 119.3701 119.38 27400.0
>>>>>>>> Mon Jul 18 21:23:00 CEST 2016=C2=A0 119.34= 63 119.37 119.315 119.37 48200.0
>>>>>>>> Mon Jul 18 21:27:59 CEST 2016=C2=A0 119.34= 01 119.3401 119.26 119.265 50300.0
>>>>>>>> Mon Jul 18 21:36:00 CEST 2016=C2=A0 119.48= 119.505 119.47 119.4741 37400.0
>>>>>>>> Mon Jul 18 21:33:00 CEST 2016=C2=A0 119.53= 5 119.54 119.445 119.455 152900.0
>>>>>>>>
>>>>>>>> But the next System.out.println(c) within = the for loop prints nothing. What could be the problem.
>>>>>>>>
>>>>>>>> My maven has following configuration for d= ataStreamUtils:
>>>>>>>> <dependency>
>>>>>>>> <groupId>org.apache.flink</groupI= d>
>>>>>>>> <artifactId>flink-streaming-contrib_= 2.10</artifactId>
>>>>>>>> <version>${flink.version}</versio= n>
>>>>>>>> </dependency>
>>>>>>>>
>>>>>>>>
>>>>>>>> Best Regards,
>>>>>>>> Subash Basnet
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>


--94eb2c063b7aef04ed0538115071--