flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Till Rohrmann <trohrm...@apache.org>
Subject Re: DataStreamUtils not working properly
Date Tue, 19 Jul 2016 12:49:01 GMT
Have you checked your logs whether they contain some problems? In general
it is not recommended collecting the streaming result back to your client.
It might also be a problem with `DataStreamUtils.collect`.

Cheers,
Till

On Tue, Jul 19, 2016 at 2:42 PM, subash basnet <yasubash@gmail.com> wrote:

> Hello all,
>
> I tried to check if it works for tuple but same problem, the collection
> still shows blank result. I took the id of centroid tuple and printed it,
> but the collection displays empty.
>
> DataStream<Centroid> centroids = newCentroidDataStream.map(new
> TupleCentroidConverter());
> DataStream<Tuple1<String>> centroidId = centroids.map(new TestMethod());
> centroidId.print();
> Iterator<Tuple1<String>> iter = DataStreamUtils.collect(centroidId);
> Collection<Tuple1<String>> testCentroids = Lists.newArrayList(iter);
> for (Tuple1<String> c : testCentroids) {
> System.out.println(c);
> }
> Output: (Mon Jul 18 17:36:03 CEST 2016) (Mon Jul 18 17:43:58 CEST 2016)
> (Mon Jul 18 17:42:59 CEST 2016) (Mon Jul 18 17:34:01 CEST 2016) (Mon Jul 18
> 17:52:00 CEST 2016) (Mon Jul 18 17:40:58 CEST 2016) for centroidId.print(),
> but no output for System.out.println(c); Best Regards, Subash Basnet
>
> On Tue, Jul 19, 2016 at 10:48 AM, subash basnet <yasubash@gmail.com>
> wrote:
>
>> Hello all,
>>
>> I am trying to convert datastream to collection, but it's shows blank
>> result. There is a stream of data which can be viewed on the console on
>> print(), but the collection of the same stream shows empty after
>> conversion. Below is the code:
>>
>> DataStream<Centroid> centroids = newCentroidDataStream.map(new
>> TupleCentroidConverter());
>> centroids.print();
>> Iterator<Centroid> iter = DataStreamUtils.collect(centroids);
>> Collection<Centroid> testCentroids = Lists.newArrayList(iter);
>> for(Centroid c: testCentroids){
>> System.out.println(c);
>> }
>>
>> The above *centroids.print()* gives the following output in console:
>>
>> Mon Jul 18 21:29:01 CEST 2016  119.3701 119.4 119.3701 119.38 27400.0
>> Mon Jul 18 21:23:00 CEST 2016  119.3463 119.37 119.315 119.37 48200.0
>> Mon Jul 18 21:27:59 CEST 2016  119.3401 119.3401 119.26 119.265 50300.0
>> Mon Jul 18 21:36:00 CEST 2016  119.48 119.505 119.47 119.4741 37400.0
>> Mon Jul 18 21:33:00 CEST 2016  119.535 119.54 119.445 119.455 152900.0
>>
>> But the next *System.out.println(c) *within the for loop prints nothing.
>> What could be the problem.
>>
>> My maven has following configuration for dataStreamUtils:
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-streaming-contrib_2.10</artifactId>
>> <version>${flink.version}</version>
>> </dependency>
>>
>>
>> Best Regards,
>> Subash Basnet
>>
>>
>

Mime
View raw message