flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From françois lacombe <francois.laco...@dcbrain.com>
Subject Re: Filter columns of a csv file with Flink
Date Fri, 06 Jul 2018 15:32:02 GMT
Hi Hequn,

The Table-API is really great.
I will use and certainly love it to solve the issues I mentioned before

One subsequent question regarding Table-API :
I've got my csv files and avro schemas that describe them.
As my users can send erroneous files, inconsistent with schemas, I want to
check if files structure is right before processing them.
I see that CsvTableSource allows to define csv fields. Then, will it check
if columns actually exists in the file and throw Exception if not ?

Or is there any other way in Apache Avro to check if a csv file is
consistent with a given schema?

Big thank to put on the table-api's way :)

Best R

François Lacombe



2018-07-06 16:53 GMT+02:00 Hequn Cheng <chenghequn@gmail.com>:

> Hi francois,
>
> If I understand correctly, you can use sql or table-api to solve you
> problem.
> As you want to project part of columns from source, a columnar storage
> like parquet/orc would be efficient. Currently, ORC table source is
> supported in flink, you can find more details here[1]. Also, there are many
> other table sources[2] you can choose. With a TableSource, you can read the
> data and register it as a Table and do table operations through sql[3] or
> table-api[4].
>
> To make a json string from several columns, you can write a user defined
> function[5].
>
> I also find a OrcTableSourceITCase[6] which I think may be helpful for
> you.
>
> Best, Hequn
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> master/dev/table/sourceSinks.html#orctablesource
> [2] https://ci.apache.org/projects/flink/flink-docs-
> master/dev/table/sourceSinks.html#table-sources-sinks
> [3] https://ci.apache.org/projects/flink/flink-docs-
> master/dev/table/sql.html
> [4] https://ci.apache.org/projects/flink/flink-docs-
> master/dev/table/tableApi.html
> [5] https://ci.apache.org/projects/flink/flink-docs-
> master/dev/table/udfs.html
> [6] https://github.com/apache/flink/blob/master/flink-
> connectors/flink-orc/src/test/java/org/apache/flink/orc/
> OrcTableSourceITCase.java
>
>
> On Fri, Jul 6, 2018 at 9:48 PM, françois lacombe <
> francois.lacombe@dcbrain.com> wrote:
>
>> Hi all,
>>
>> I'm a new user to Flink community. This tool sounds great to achieve some
>> data loading of millions-rows files into a pgsql db for a new project.
>>
>> As I read docs and examples, a proper use case of csv loading into pgsql
>> can't be found.
>> The file I want to load isn't following the same structure than the
>> table, I have to delete some columns and make a json string from several
>> others too prior to load to pgsql
>>
>> I plan to use Flink 1.5 Java API and a batch process.
>> Does the DataSet class is able to strip some columns out of the records I
>> load or should I iterate over each record to delete the columns?
>>
>> Same question to make a json string from several columns of the same
>> record?
>> E.g json_column =3D {"field1":col1, "field2":col2...}
>>
>> I work with 20 millions length files and it sounds pretty ineffective to
>> iterate over each records.
>> Can someone tell me if it's possible or if I have to change my mind about
>> this?
>>
>>
>> Thanks in advance, all the best
>>
>> François Lacombe
>>
>>
>

Mime
View raw message