flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Radu Tudoran <radu.tudo...@huawei.com>
Subject RE: Flink writeAsCsv
Date Thu, 04 Feb 2016 12:49:18 GMT
Hi Radu,

It is indeed interesting to know how each window could be registered separately -  I am not
sure it any of the existing mechanisms in Flink support this.
I think you need to create your own output sink. It is a bit tricky to pass the window sequence
number (actually I do  not think such an index is kept – but you can create one by yourself).
Maybe an easier option is to manage the writing of the data yourself in the window function
or in a custom created evictor. In the window and in the evictor you have access to all data
and you can create specific files for each window triggered



From: Radu Prodan [mailto:raduprodan6@gmail.com]
Sent: Thursday, February 04, 2016 11:58 AM
To: user@flink.apache.org
Subject: Re: Flink writeAsCsv

Hi Marton,

Thanks to your comment I managed to get it worked. At least it outputs the results. However,
what I need is to output each window result seperately.  Now, it outputs the results of parallel
working windows (I think) and appends the new results to them. For example, If I have parallelism
of 10, then I will have at most 10 files and each file will grow in size as windows continue.
What I want is, to have seperate file for a window. For example, after n'th window is computed
output it to some file and close the file.

-best
Radu

On Thu, Feb 4, 2016 at 11:42 AM Márton Balassi <balassi.marton@gmail.com<mailto:balassi.marton@gmail.com>>
wrote:
Hey Radu,

As you are using the streaming api I assume that you call env.execute() in both cases. Is
that the case?

Do you see any errors appearing? My first call would be if your data type is not a tuple type
then writeAsCsv does not work by default.

Best,

Marton

On Thu, Feb 4, 2016 at 11:36 AM, Radu Prodan <raduprodan6@gmail.com<mailto:raduprodan6@gmail.com>>
wrote:
Hi all,

I am new to flink. I wrote a simple program and I want it to output as csv file.


timeWindowAll(Time.of(3, TimeUnit.MINUTES))

.apply(newFunction1())

.writeAsCsv("file:///user/someuser/Documents/somefile.csv<file:///\\user\someuser\Documents\somefile.csv>");



When I change the sink to . print(), it works and outputs some results.

I want it to output the result of every window. However, it outputs nothing and the file is
not created. Am I missing anything?



-best

Radu







Mime
View raw message