flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nsengupta <sengupta.nirma...@gmail.com>
Subject Table API: java.sql.DateTime is not supported;
Date Sat, 04 Feb 2017 16:46:38 GMT
I am reading a bunch of records from a CSV file. A record looks like this:

"4/1/2014 0:11:00",40.769,-73.9549,"B02512"

I intend to treat these records as SQL Rows and then process.

Here's the code:
----------------------------------------
package org.nirmalya.exercise

import java.time.LocalDateTime

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.table.TableEnvironment
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.table._
import org.apache.flink.api.table.sources.CsvTableSource
import org.apache.flink.api.scala.table.TableConversions
import org.apache.flink.api.scala._
/**
  * Created by nirmalya on 4/2/17.
  */
object TrafficDataTrainer {

  def main(args: Array[String]): Unit = {

    case class Trip(timeOfPickUp: LocalDateTime, lat: Double, lon: Double,
base: String)

    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val myDataStorePath = "/home/nirmalya/Downloads/Traffic"

    val csvTableSource = new CsvTableSource(
      myDataStorePath + "/traffic-raw-data-apr14.csv",
      Array("timeOfPickUp", "lat", "lon", "base"),
      (
        Array[org.apache.flink.api.common.typeinfo.TypeInformation[_]](
          Types.TIMESTAMP,
          Types.DOUBLE,
          Types.DOUBLE,
          Types.STRING
        )
      )
    )

    tableEnv.registerTableSource("TrafficData",csvTableSource)

    val trafficTable = tableEnv.scan("TrafficData")

    val result = trafficTable.select("timeOfPickUp,lat,lon,base")

    val trafficDataSet = new TableConversions(result).toDataSet[Trip]

    trafficDataSet.collect().take(10).foreach(println)
  }
}
----------------------------------------

At run time, the exception that is thrown is:

------------------------------------------------------
Exception in thread "main" java.lang.IllegalArgumentException: The type
'java.sql.Date' is not supported for the CSV input format.
	at
org.apache.flink.api.common.io.GenericCsvInputFormat.setFieldsGeneric(GenericCsvInputFormat.java:306)
	at
org.apache.flink.api.table.runtime.io.RowCsvInputFormat.<init>(RowCsvInputFormat.scala:52)
	at
org.apache.flink.api.table.sources.CsvTableSource.createCsvInput(CsvTableSource.scala:99)
	at
org.apache.flink.api.table.sources.CsvTableSource.getDataSet(CsvTableSource.scala:78)
	at
org.apache.flink.api.table.plan.nodes.dataset.BatchTableSourceScan.translateToPlan(BatchTableSourceScan.scala:55)
	at
org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:274)
	at
org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:139)
	at
org.apache.flink.api.scala.table.TableConversions.toDataSet(TableConversions.scala:41)
	at org.nirmalya.exercise.UberDataTrainer$.main(UberDataTrainer.scala:45)
	at org.nirmalya.exercise.UberDataTrainer.main(UberDataTrainer.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

------------------------------------------------------

I see that in org.apache.flink.api.common.io.GenericCsvInputFormat:303, the
check fails because the stated type 
isn't a part of known types. However, the constructor of *CsvTableSource*
accepts a /Types.DATE/ as well /Types.TIMESTAMP/ (I tried with both of them,
and the exception is the same).

Could someone please point out where I am going wrong?

-- Nirmalya







--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Table-API-java-sql-DateTime-is-not-supported-tp11439.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Mime
View raw message