flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cody Innowhere <e.neve...@gmail.com>
Subject How to run table api in 1.1-SNAPSHOT
Date Thu, 02 Jun 2016 11:56:13 GMT
Hi guys,
I'm trying to run Table-API in master trunk using the sql/registerDataSet
APIs in TableEnvironment class.

According to the doc in table.md, after registering a table, I should be
able to use a sql query on the tabelEnv, so I made a slight change in
WordCountTable.scala by simply adding two lines:

---------------------------------------------------------
object WordCountTable {

  case class WC(word: String, count: Int)

  def main(args: Array[String]): Unit = {

    // set up execution environment
    val env = ExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)

    val input = env.fromElements(WC("hello", 1), WC("hello", 2), WC("ciao",
3))
    val expr = input.toTable(tEnv)

    // *************** added lines ***************
    tEnv.registerDataSet("WC", input, 'word, 'count)
    val result1 = tEnv.sql("SELECT word FROM WC ")

    val result = expr
      .groupBy('word)
      .select('word, 'count.sum as 'count)
      .toDataSet[WC]

    result.print()
  }
}

As you can see current query sql is "SELECT word FROM WC" and it works.
But when I change query sql to :
"SELECT word, count FROM WC" it does not work with the exception:
"Exception in thread "main"
org.apache.calcite.sql.parser.SqlParseException: Encountered "count FROM"
at line 1, column 13.
Was expecting one of:
  ...
  ..."

Do I miss something?

BTW., I read the doc at
https://docs.google.com/document/d/1TLayJNOTBle_-m1rQfgA6Ouj1oYsfqRjPcp1h2TVqdI/,
I suppose Task2 has been finished already, right? And is somebody working
on Task3? Do we have a time map for SQL on Flink?

Thanks~

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message