flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timo Walther <twal...@apache.org>
Subject Re: Exception : Table of atomic type can only have a single field, when transferring DataStream to Table ?
Date Mon, 25 Sep 2017 08:22:10 GMT
Hi,

I also replied to your Stackoverflow question. I think the problem is 
that BillCount has the wrong type and is therefore treated as one single 
black box.

Haohui's suggestion will no work because the row type needs information 
about the fields.  The easiest thing is to figure out why BillCount has 
the wrong type. Make sure that it is defined in a statically.

What type is Record? Maybe you don't need the additional MapFunction but 
can use the Table API for mapping.

Regards,
Timo


Am 9/25/17 um 9:29 AM schrieb Haohui Mai:
> Hi,
>
> I think instead of generating DataStream[BillCount], the correct way 
> is to generate DataStream[Row], that is,
>
> kafkaInputStream.map(value -> Row.of(value.getLogis_id, 
> value.getProvince_id, value.getCity_id, 
> value.getOrder_require_varieties, value.getOrder_rec_amount, 
> value.getStore_rec_date.getTime)
>
> That should work.
>
> Regards,
> Haohui
>
>
>
> On Sun, Sep 24, 2017 at 6:40 PM laney0606@163.com 
> <mailto:laney0606@163.com> <laney0606@163.com 
> <mailto:laney0606@163.com>> wrote:
>
>     Hi,
>          I‘m confused about a problem, occuring a exception
>     "org.apache.flink.table.api.TableException: Table of atomic type can only have a single field."
>
>      Both *BillCount *and***Record *are class object*.*Following is code.
>       case  class
>     *BillCount*(logisId: Int, provinceId: Int, cityId: Int, orderRequVari: Int, orderRecAmount: Double, orderRecDate: Long)
>     val kafkaInputStream: DataStream[*Record*] = env.addSource(source)
>       //source is FlinkKafkaConsumer010 source
>     val tbDataStream : DataStream[*BillCount*] = kafkaInputStream.map(
>      new MapFunction[Record, BillCount] {
>     override def map(value: *Record*) = {
>     *BillCount*(value.getLogis_id, value.getProvince_id, value.getCity_id,
>     value.getOrder_require_varieties, value.getOrder_rec_amount, value.getStore_rec_date.getTime)
>             }
>           })
>      val stream = tbDataStream.toTable(tbEnv, 'logisId, 'provinceId, 'cityId, 'orderRequVari, 'orderRecAmount, 'orderRecDate)
>           // occur error here
>
>         Error :
>     Exception in thread "main" org.apache.flink.table.api.TableException: Table of atomic type can only have a single field.
>     at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:627)
>     at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:624)
>     at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>     at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>     at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>     at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>     at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>     at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:108)
>     at org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:624)
>     at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:398)
>     at org.apache.flink.table.api.scala.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:85)
>     at org.apache.flink.table.api.scala.DataStreamConversions.toTable(DataStreamConversions.scala:58)
>
>
>       Thanks.
>
>
>     ------------------------------------------------------------------------
>     laney0606@163.com <mailto:laney0606@163.com>
>
>
>     【网易自营|30天无忧退货】仅售同款价1/4!MUJI制造商“2017秋冬舒适家居拖鞋系列”限时仅34.9元>>
>     <http://you.163.com/item/detail?id=1165011&from=web_gg_mail_jiaobiao_9>
>
>


Mime
View raw message