flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From subash basnet <yasub...@gmail.com>
Subject Re: DataStreamUtils not working properly
Date Wed, 20 Jul 2016 11:11:19 GMT
Hello Maximilian,

Thank's for the update. Yup it works in the example you gave. I checked
with collection also it works. But not in my datastream case after the
collection.
DataStream<Centroid> centroids = *newCentroidDataStream*.map(new
TupleCentroidConverter());
Iterator<Centroid> iter = DataStreamUtils.collect(centroids);
while (iter.hasNext()) {
System.out.println(iter.next());
}
Collection<Centroid> testCentroids = Lists.newArrayList(iter);
for (Centroid c : testCentroids) {
System.out.println(c);
}

In the above code the while loop prints the result as below, but the next
for loop after the collection gives blank.

Tue Jul 19 15:49:00 CEST 2016  118.7 118.81 118.7 118.77 76300.0
Tue Jul 19 15:47:02 CEST 2016  118.85 118.885 118.8 118.84 75600.0
Tue Jul 19 15:46:00 CEST 2016  118.8627 118.93 118.79 118.8 76300.0
Tue Jul 19 15:45:59 CEST 2016  118.8 118.94 118.77 118.9 106800.0

Not sure, what is the problem, as after collection it gives blank result in
my case but works in the example you gave. Below is my
*newCentroidDataStream: *
@SuppressWarnings("serial")
DataStream<Tuple2<String, Double[]>> newCentroidDataStream =
keyedEdits.timeWindow(Time.seconds(1))
.fold(new Tuple2<>("", columns1), new FoldFunction<Stock, Tuple2<String,
Double[]>>() {
@Override
public Tuple2<String, Double[]> fold(Tuple2<String, Double[]> st, Stock
value) {
Double[] columns = new Double[5];// close,high,low,open,volume
columns[0] = value.getClose();
columns[1] = value.getHigh();
columns[2] = value.getLow();
columns[3] = value.getOpen();
columns[4] = (double) value.getVolume();
return (new Tuple2<String, Double[]>(value.getId(), columns));
}
});

Regards,
Subash Basnet

On Wed, Jul 20, 2016 at 12:20 PM, Maximilian Michels <mxm@apache.org> wrote:

> [image: Boxbe] <https://www.boxbe.com/overview> This message is eligible
> for Automatic Cleanup! (mxm@apache.org) Add cleanup rule
> <https://www.boxbe.com/popup?url=https%3A%2F%2Fwww.boxbe.com%2Fcleanup%3Fkey%3DdRAZZaqWm1E0eg03Nz70nMo%252BLvoi6a83GXx1GdehBK8%253D%26token%3Dr7zUjrLb%252Bdn4TKnSVvUcucALuVr3xW2ZBkG32vl5UP3Es2ZXH%252FL4tnqxS1KYL%252BwTpEEuNExnQBAfAagC2f6BdxVflVOjVFXE0%252BpXmTyy39vJbORhTElAP8ZPgcDvh44bRjOVoQaZN2Y%253D&tc_serial=26149764160&tc_rand=110900677&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001>
> | More info
> <http://blog.boxbe.com/general/boxbe-automatic-cleanup?tc_serial=26149764160&tc_rand=110900677&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001>
>
> Just tried the following and it worked:
>
> public static void main(String[] args) throws IOException {
>    StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>    final DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4);
>    source.print();
>
>    final Iterator<Integer> iter = DataStreamUtils.collect(source);
>    while (iter.hasNext()) {
>       System.out.println(iter.next());
>    }
> }
>
> It prints:
>
> 1
> 2
> 3
> 4
> 2> 2
> 1> 1
> 4> 4
> 3> 3
>
> However, the collect util needs some improvements. It assumes that the
> machine running the code is reachable on a random port by the Flink
> cluster. If you have any firewalls, then this might not work.
>
> Cheers,
> Max
>
> On Tue, Jul 19, 2016 at 10:13 PM, subash basnet <yasubash@gmail.com>
> wrote:
>
>> Hello Till,
>>
>> Yup I can see the log output in my console, but there is no information
>> there regarding if there is any error in conversion. Just normal warn and
>> info as below:
>> 22:09:16,676 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
>>         - No state backend has been specified, using default state backend
>> (Memory / JobManager)
>> 22:09:16,676 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>>         - State backend is set to heap memory (checkpoint to jobmanager)
>>
>> The above message is always there when I run my project. It would be
>> great if someone could check why the collection of datastream via
>> DataStreamUtils is giving empty result.
>>
>> Best Regards,
>> Subash Basnet
>>
>> On Tue, Jul 19, 2016 at 4:52 PM, Till Rohrmann <trohrmann@apache.org>
>> wrote:
>>
>>> It depends if you have a log4j.properties file specified in your
>>> classpath. If you see log output on the console, then it should also print
>>> errors there.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Jul 19, 2016 at 3:08 PM, subash basnet <yasubash@gmail.com>
>>> wrote:
>>>
>>>> Hello Till,
>>>>
>>>> Shouldn't it write something in the eclipse console if there is any
>>>> error or warning. But nothing about error is printed on the console. And
I
>>>> checked the flink project folder: flink-core, flink streaming as such but
>>>> couldn't find where the log is written when run via eclipse.
>>>>
>>>> Best Regards,
>>>> Subash Basnet
>>>>
>>>> On Tue, Jul 19, 2016 at 2:49 PM, Till Rohrmann <trohrmann@apache.org>
>>>> wrote:
>>>>
>>>>> Have you checked your logs whether they contain some problems? In
>>>>> general it is not recommended collecting the streaming result back to
your
>>>>> client. It might also be a problem with `DataStreamUtils.collect`.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Tue, Jul 19, 2016 at 2:42 PM, subash basnet <yasubash@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hello all,
>>>>>>
>>>>>> I tried to check if it works for tuple but same problem, the
>>>>>> collection still shows blank result. I took the id of centroid tuple
and
>>>>>> printed it, but the collection displays empty.
>>>>>>
>>>>>> DataStream<Centroid> centroids = newCentroidDataStream.map(new
>>>>>> TupleCentroidConverter());
>>>>>> DataStream<Tuple1<String>> centroidId = centroids.map(new
>>>>>> TestMethod());
>>>>>> centroidId.print();
>>>>>> Iterator<Tuple1<String>> iter = DataStreamUtils.collect(centroidId);
>>>>>> Collection<Tuple1<String>> testCentroids = Lists.newArrayList(iter);
>>>>>> for (Tuple1<String> c : testCentroids) {
>>>>>> System.out.println(c);
>>>>>> }
>>>>>> Output: (Mon Jul 18 17:36:03 CEST 2016) (Mon Jul 18 17:43:58 CEST
>>>>>> 2016) (Mon Jul 18 17:42:59 CEST 2016) (Mon Jul 18 17:34:01 CEST 2016)
(Mon
>>>>>> Jul 18 17:52:00 CEST 2016) (Mon Jul 18 17:40:58 CEST 2016) for
>>>>>> centroidId.print(), but no output for System.out.println(c); Best
Regards,
>>>>>> Subash Basnet
>>>>>>
>>>>>> On Tue, Jul 19, 2016 at 10:48 AM, subash basnet <yasubash@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hello all,
>>>>>>>
>>>>>>> I am trying to convert datastream to collection, but it's shows
>>>>>>> blank result. There is a stream of data which can be viewed on
the console
>>>>>>> on print(), but the collection of the same stream shows empty
after
>>>>>>> conversion. Below is the code:
>>>>>>>
>>>>>>> DataStream<Centroid> centroids = newCentroidDataStream.map(new
>>>>>>> TupleCentroidConverter());
>>>>>>> centroids.print();
>>>>>>> Iterator<Centroid> iter = DataStreamUtils.collect(centroids);
>>>>>>> Collection<Centroid> testCentroids = Lists.newArrayList(iter);
>>>>>>> for(Centroid c: testCentroids){
>>>>>>> System.out.println(c);
>>>>>>> }
>>>>>>>
>>>>>>> The above *centroids.print()* gives the following output in console:
>>>>>>>
>>>>>>> Mon Jul 18 21:29:01 CEST 2016  119.3701 119.4 119.3701 119.38
27400.0
>>>>>>> Mon Jul 18 21:23:00 CEST 2016  119.3463 119.37 119.315 119.37
48200.0
>>>>>>> Mon Jul 18 21:27:59 CEST 2016  119.3401 119.3401 119.26 119.265
>>>>>>> 50300.0
>>>>>>> Mon Jul 18 21:36:00 CEST 2016  119.48 119.505 119.47 119.4741
37400.0
>>>>>>> Mon Jul 18 21:33:00 CEST 2016  119.535 119.54 119.445 119.455
>>>>>>> 152900.0
>>>>>>>
>>>>>>> But the next *System.out.println(c) *within the for loop prints
>>>>>>> nothing. What could be the problem.
>>>>>>>
>>>>>>> My maven has following configuration for dataStreamUtils:
>>>>>>> <dependency>
>>>>>>> <groupId>org.apache.flink</groupId>
>>>>>>> <artifactId>flink-streaming-contrib_2.10</artifactId>
>>>>>>> <version>${flink.version}</version>
>>>>>>> </dependency>
>>>>>>>
>>>>>>>
>>>>>>> Best Regards,
>>>>>>> Subash Basnet
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
>

Mime
View raw message