flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Haohui Mai <ricet...@gmail.com>
Subject Re: Exception : Table of atomic type can only have a single field, when transferring DataStream to Table ?
Date Mon, 25 Sep 2017 07:29:08 GMT
Hi,

I think instead of generating DataStream[BillCount], the correct way is to
generate DataStream[Row], that is,

kafkaInputStream.map(value -> Row.of(value.getLogis_id,
value.getProvince_id, value.getCity_id, value.getOrder_require_varieties,
value.getOrder_rec_amount, value.getStore_rec_date.getTime)

That should work.

Regards,
Haohui



On Sun, Sep 24, 2017 at 6:40 PM laney0606@163.com <laney0606@163.com> wrote:

> Hi,
>      I‘m confused about a problem, occuring a exception "
> org.apache.flink.table.api.TableException: Table of atomic type can only have a single
field.
> "
>      Both *BillCount *and *Record *are class object*.*  Following is code.
>
>        case  class *BillCount*
> (logisId: Int, provinceId: Int, cityId: Int, orderRequVari: Int, orderRecAmount: Double,
orderRecDate: Long)
>        val kafkaInputStream: DataStream[*Record*] = env.addSource(source)
>   //source is FlinkKafkaConsumer010 source
>        val tbDataStream : DataStream[*BillCount*] = kafkaInputStream.map(
>                   new MapFunction[Record, BillCount] {
>                     override def map(value: *Record*) = {
>                       *BillCount*
> (value.getLogis_id, value.getProvince_id, value.getCity_id,
>
> value.getOrder_require_varieties, value.getOrder_rec_amount, value.getStore_rec_date.getTime)
>         }
>       })
>
>  val stream = tbDataStream.toTable(tbEnv, 'logisId, 'provinceId, 'cityId, 'orderRequVari,
'orderRecAmount, 'orderRecDate)
>       // occur error here
>
>
>     Error :
>
> Exception in thread "main" org.apache.flink.table.api.TableException: Table of atomic
type can only have a single field.
>
> at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:627)
>
> at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:624)
>
> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>
> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>
> at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>
> at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:108)
>
> at org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:624)
>
> at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:398)
>
> at org.apache.flink.table.api.scala.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:85)
>
> at org.apache.flink.table.api.scala.DataStreamConversions.toTable(DataStreamConversions.scala:58)
>
>
>   Thanks.
>
>
> ------------------------------
> laney0606@163.com
>
>
> 【网易自营|30天无忧退货】仅售同款价1/4!MUJI制造商“2017秋冬舒适家居拖鞋系列”限时仅34.9元>>
> <http://you.163.com/item/detail?id=1165011&from=web_gg_mail_jiaobiao_9>
>
>

Mime
View raw message