flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "492341344" <www.liuxingjs....@qq.com>
Subject Temporal Table是否不支持left join,完整案例如下
Date Mon, 15 Apr 2019 07:19:24 GMT
import java.util.Collections

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.types.DataTypes
import org.apache.flink.table.sources.csv.CsvTableSource
import org.apache.flink.types.Row
import scala.collection.mutable

object LateralJoinList {

  def procLateralJoinPrint(sql: String) = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)

    // rates_data:
    //    currency,rate
    //    US Dollar,102
    //    Euro,114
    //    Yen,1
    //    Euro,116
    val tableRateSource = CsvTableSource
      .builder
      .path("/Users/xxx/Desktop/csv")
      .field("currency", DataTypes.STRING)
      .field("rate", DataTypes.DOUBLE)
      .uniqueKeys(Collections.singleton(Collections.singleton("currency")))
      .fieldDelimiter(",")
      .ignoreFirstLine
      .ignoreParseErrors
      .build

    // 构造订单数据
    val ordersData = new mutable.MutableList[(Int, String)]
    ordersData.+=((2, "Euro"))
    ordersData.+=((1, "US Dollar"))
    ordersData.+=((50, "Yen"))
    ordersData.+=((3, "Euro"))
    ordersData.+=((3, "Euroxxx")) // not emit

    val order_data = env
      .fromCollection(ordersData)
      .toTable(tEnv, 'amount, 'currency, 'proctime.proctime)

    tEnv.registerTableSource("LatestRates", tableRateSource)
    tEnv.registerTable("Orders", order_data)

    tEnv.sqlQuery(sql).toRetractStream[Row].print()
    env.execute()
  }

  def main(args: Array[String]): Unit = {
    val sql1 =
      """
        |SELECT
        |  o.amount, o.currency, r.rate, o.amount * r.rate
        |FROM
        |  Orders AS o
        |LEFT OUTER JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
        |ON o.currency = r.currency
      """.stripMargin

    procLateralJoinPrint(sql1)
  }
}
Mime
  • Unnamed multipart/alternative (inline, 8-Bit, 0 bytes)
View raw message