flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: how to write dataset in a file?
Date Sun, 22 Nov 2015 17:00:44 GMT
You can configure the system to always create a directly (not just on
parallelism > 1),
 see "fs.output.always-create-directory"under
https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/config.html#file-systems

The behavior we support right now is pretty much what people coming from
the Hadoop world are used to, that's why it behaves the way it does.

Greetings,
Stephan


On Sun, Nov 22, 2015 at 8:49 AM, jun aoki <jaoki@apache.org> wrote:

> Thank you guys for helping me understand!
> Precisely I was able to control the behavior on my research work with your
> help.
>
> Does anybody think, however, the behavior is not straightforward? (At least
> there is another guy on StackOverflow who misunderstand the same way I did)
>
> I'd like to ask the community if they like my suggestions
> 1. Make the method signatures writeAsText(String directoryPath) and
> writeAsCsv(String directoryPath) (not filePath but directoryPath) and they
> ALWAYS create a directory instead of sometimes a file and sometimes a
> directory depending on the sink's parallelism.
> This creates a directory and a sole "1" file is created even when
> parallelism is set to 1.
> This is more consistent and no confusion of what it says it does.
>
> 2. And create another methods called writeAsTextFile(String filePath) and
> writeAsCsvFile(String filePath) which ALWAYS create a file and there is no
> directory. In order to make this happen, either its sink's parallelism is
> implicitly set to 1, or collect all data from all workers into one dataset
> behind the scene.
>
> What do you guys think?
>
>
> On Sat, Nov 21, 2015 at 6:18 AM, Matthias J. Sax <mjsax@apache.org> wrote:
>
> > I would not set
> >
> > > ExecutionEnvironment env =
> > ExecutionEnvironment.createLocalEnvironment().setParallelism(1);
> >
> > because this changes the default parallelism of *all* operator to one.
> > Instead, only set the parallelism of the **sink** to one (as described
> > here:
> >
> >
> https://stackoverflow.com/questions/32580970/writeascsv-and-writeastext-is-unexpected/32581813#32581813
> > )
> >
> > filteredData.writeAsText("file:///output1.txt").setParallelism(1);
> >
> > -Matthias
> >
> > On 11/21/2015 02:23 PM, Márton Balassi wrote:
> > > Additionally as having multiple files under /output1.txt is standard in
> > the
> > > Hadoop ecosystem you can transparently read all the files with
> > > env.readTextFile("/output1.txt").
> > >
> > > You can also set parallelism on individual operators (e.g the file
> > writer)
> > > if you really need a single output.
> > >
> > > On Fri, Nov 20, 2015, 21:27 Suneel Marthi <smarthi@apache.org> wrote:
> > >
> > >> You can write to a single output file by setting parallelism == 1
> > >>
> > >>  So final ExecutionEnvironment env = ExecutionEnvironment.
> > >> createLocalEnvironment().setParallelism(1);
> > >>
> > >> The reason u see multiple output files is because, each worker is
> > writing
> > >> to a different file.
> > >>
> > >> On Fri, Nov 20, 2015 at 10:06 PM, jun aoki <jaoki@apache.org> wrote:
> > >>
> > >>> Hi Flink community
> > >>>
> > >>> I know I'm mistaken but could not find what I want.
> > >>>
> > >>> final ExecutionEnvironment env =
> > >>> ExecutionEnvironment.createLocalEnvironment();
> > >>> DataSet<String> data = env.readTextFile("file:///text1.txt");
> > >>> FilterFunction<String> filter = new MyFilterFunction();  // looks
> for a
> > >>> line starts with "[ERROR]"
> > >>> DataSet<String> filteredData = data.filter(filter);
> > >>> filteredData.writeAsText("file:///output1.txt");
> > >>> env.execute();
> > >>>
> > >>> Then I expect to get a single file /output1.txt , but actually get
> > >>> /output1.txt/1, /output1.txt/2, /output1.txt/3...
> > >>> I assumed I was getting a single file because the method signature
> says
> > >>> writeAsText(String filePath).  <-- filePath instead of directoryPath
> > >>> Also the Javadoc comment sounds like I assumed right.
> > >>>
> > >>>
> > >>
> >
> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java#L1354
> > >>>
> > >>> Can anyone tell if the method signature and document should be fixed?
> > or
> > >> if
> > >>> I am missing some configuration?
> > >>>
> > >>> --
> > >>> -jun
> > >>>
> > >>
> > >
> >
> >
>
>
> --
> -jun
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message