flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Flink Table API
Date Wed, 12 Feb 2020 16:15:05 GMT
Hi to all,
I was trying to use the new Table API using the new Blink planner but I
figured out that they do not use exactly the same APIs..for example I can't
go back and forth from Tables to Dataset/Datastream anymore (using
tableEnv.toDataset for example).
Is this a temporary behaviour or this functionality will be removed sooner
or later (since from what I understood the current Flink planner will be
replaced by the blink one)?
I like the idea of creating sourcing just using properties but then I can't
transform it into a Dataset/Datastream.
Also the documentation seems to be quite messy about this transition
phase...

This is my test program:

public static void main(String[] args) throws Exception {
  // set up the batch execution environment
  EnvironmentSettings bbSettings =
 EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
  TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);


  String createTableSql = "CREATE TABLE civici (\n" //
    + "\t`X` DOUBLE,\n" //
    + "\t`Y` DOUBLE\n" //
    + ") WITH (\n"//
    + "\t'connector.type' = 'filesystem',\n"//
    + "\t'connector.path' = 'file:///tmp/test.csv',\n"//
    + "\t'format.type' = 'csv',\n" //
    + "\t'format.fields.0.name' = 'X',\n"//
    + "\t'format.fields.0.data-type' = 'DOUBLE',\n" //
    + "\t'format.fields.1.name' = 'Y',\n"//
    + "\t'format.fields.1.data-type' = 'DOUBLE',\n"//
    + "\t'format.field-delimiter' = ',',\n"//
//    + "\t'format.line-delimiter' = '\n',\n"//
    + "\t'format.quote-character' = '\"',\n"//
//    +"\t'format.allow-comments' = 'true',"//
//    +"\t'format.ignore-parse-errors' = 'true',"//
    + "\t'format.ignore-first-line' = 'true'\n"//
//    +"\t'format.array-element-delimiter' = '|', "//
//    +"\t'format.escape-character' = '\\',"//
//    +"\t'format.null-literal' = 'n/a'"//
    + ")";
  System.out.println(createTableSql);
  bbTableEnv.sqlUpdate(createTableSql);

  Table table = bbTableEnv.sqlQuery("SELECT * FROM civici");
  List<Row> rows = TableUtils.collectToList(table);

  for (Row row : rows) {
  System.out.println(row);
  }
}


Best,
Flavio

Mime
View raw message