flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aj <ajainje...@gmail.com>
Subject Re: BucketingSink capabilities for DataSet API
Date Wed, 19 Feb 2020 16:15:37 GMT
Thanks, Timo. I have not used and explore Table API until now. I have used
dataset and datastream API only.
I will read about the Table API.

On Wed, Feb 19, 2020 at 4:33 PM Timo Walther <twalthr@apache.org> wrote:

> Hi Anuj,
>
> another option would be to use the new Hive connectors. Have you looked
> into those? They might work on SQL internal data types which is why you
> would need to use the Table API then.
>
> Maybe Bowen in CC can help you here.
>
> Regards,
> Timo
>
> On 19.02.20 11:14, Rafi Aroch wrote:
> > Hi Anuj,
> >
> > It's been a while since I wrote this (Flink 1.5.2). Could be a
> > better/newer way, but this is what how I read & write Parquet with
> > hadoop-compatibility:
> >
> >     // imports
> >     import org.apache.avro.generic.GenericRecord;
> >     import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
> >
> >     import
> org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
> >
> >     import org.apache.flink.hadoopcompatibility.HadoopInputs;
> >     import org.apache.hadoop.conf.Configuration;
> >     import org.apache.hadoop.fs.Path;
> >     import org.apache.hadoop.mapreduce.Job;
> >     import org.apache.parquet.avro.AvroParquetInputFormat;
> >
> >     // Creating Parquet input format
> >     Configuration conf = new Configuration();
> >     Job job = Job.getInstance(conf);
> >     AvroParquetInputFormat<GenericRecord> parquetInputFormat = new
> >     AvroParquetInputFormat<>();
> >     AvroParquetInputFormat.setInputDirRecursive(job, true);
> >     AvroParquetInputFormat.setInputPaths(job, pathsToProcess);
> >     HadoopInputFormat<Void, GenericRecord> inputFormat
> >     = HadoopInputs.createHadoopInput(parquetInputFormat, Void.class,
> >     GenericRecord.class, job);
> >
> >     // Creating Parquet output format
> >     AvroParquetOutputFormat<GenericRecord> parquetOutputFormat = new
> >     AvroParquetOutputFormat<>();
> >     AvroParquetOutputFormat.setSchema(job, new
> >     Schema.Parser().parse(SomeEvent.SCHEMA));
> >     AvroParquetOutputFormat.setCompression(job,
> >     CompressionCodecName.SNAPPY);
> >     AvroParquetOutputFormat.setCompressOutput(job, true);
> >     AvroParquetOutputFormat.setOutputPath(job, new Path(pathString));
> >     HadoopOutputFormat<Void, GenericRecord> outputFormat = new
> >     HadoopOutputFormat<>(parquetOutputFormat, job);
> >
> >     DataSource<Tuple2<Void, GenericRecord>> inputFileSource =
> >     env.createInput(inputFormat);
> >
> >     // Start processing...
> >
> >     // Writing result as Parquet
> >     resultDataSet.output(outputFormat);
> >
> >
> > Regarding writing partitioned data, as far as I know, there is no way to
> > achieve that with the DataSet API with hadoop-compatibility.
> >
> > You could implement this with reading from input files as stream and
> > then using StreamingFileSink with a custom BucketAssigner [1].
> > The problem with that (which was not yet resolved AFAIK) is described
> > here [2] in "Important Notice 2".
> >
> > Sadly I say, that eventually, for this use-case I chose Spark to do the
> > job...
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html
> > [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html#general
> >
> > Hope this helps.
> >
> > Rafi
> >
> >
> > On Sat, Feb 15, 2020 at 5:03 PM aj <ajainjecrc@gmail.com
> > <mailto:ajainjecrc@gmail.com>> wrote:
> >
> >     Hi Rafi,
> >
> >     I have a similar use case where I want to read parquet files in the
> >     dataset and want to perform some transformation and similarly want
> >     to write the result using year month day partitioned.
> >
> >     I am stuck at first step only where how to read and write
> >     Parquet files using hadoop-Compatability.
> >
> >     Please help me with this and also if u find the solution for how to
> >     write data in partitioned.
> >
> >     Thanks,
> >     Anuj
> >
> >     On Thu, Oct 25, 2018 at 5:35 PM Andrey Zagrebin
> >     <andrey@data-artisans.com <mailto:andrey@data-artisans.com>> wrote:
> >
> >         Hi Rafi,
> >
> >         At the moment I do not see any support of Parquet in DataSet API
> >         except HadoopOutputFormat, mentioned in stack overflow question.
> >         I have cc’ed Fabian and Aljoscha, maybe they could provide more
> >         information.
> >
> >         Best,
> >         Andrey
> >
> >>         On 25 Oct 2018, at 13:08, Rafi Aroch <rafi.aroch@gmail.com
> >>         <mailto:rafi.aroch@gmail.com>> wrote:
> >>
> >>         Hi,
> >>
> >>         I'm writing a Batch job which reads Parquet, does some
> >>         aggregations and writes back as Parquet files.
> >>         I would like the output to be partitioned by year, month, day
> >>         by event time. Similarly to the functionality of the
> >>         BucketingSink.
> >>
> >>         I was able to achieve the reading/writing to/from Parquet by
> >>         using the hadoop-compatibility features.
> >>         I couldn't find a way to partition the data by year, month,
> >>         day to create a folder hierarchy accordingly. Everything is
> >>         written to a single directory.
> >>
> >>         I could find an unanswered question about this issue:
> >>
> https://stackoverflow.com/questions/52204034/apache-flink-does-dataset-api-support-writing-output-to-individual-file-partit
> >>
> >>         Can anyone suggest a way to achieve this? Maybe there's a way
> >>         to integrate the BucketingSink with the DataSet API? Another
> >>         solution?
> >>
> >>         Rafi
> >
> >
> >
> >     --
> >     Thanks & Regards,
> >     Anuj Jain
> >     Mob. : +91- 8588817877
> >     Skype : anuj.jain07
> >     ****<http://www.oracle.com/>
> >
> >
> >     <http://www.cse.iitm.ac.in/%7Eanujjain/>
> >
>
>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>

Mime
View raw message