flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chesnay Schepler <ches...@apache.org>
Subject Re: Field could not be resolved by the field mapping when using kafka connector
Date Thu, 15 Nov 2018 09:53:45 GMT
This issue was already resolved in another thread by the same author.

On 15.11.2018 10:52, Dominik Wosiński wrote:
> Hey,
>
> Could You please show a sample data that You want to process? This 
> would help in verifying the issue.
>
> Best Regards,
> Dom.
>
> wt., 13 lis 2018 o 13:58 Jeff Zhang <zjffdu@gmail.com 
> <mailto:zjffdu@gmail.com>> napisał(a):
>
>     Hi,
>
>     I hit the following error when I try to use kafka connector in
>     flink table api. There's very little document about how to use
>     kafka connector in flink table api, could anyone help me on that ?
>     Thanks
>
>     Exception in thread "main"
>     org.apache.flink.table.api.ValidationException: Field 'event_ts'
>     could not be resolved by the field mapping.
>     at
>     org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputField(TableSourceUtil.scala:491)
>     at
>     org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
>     at
>     org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
>     at
>     scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at
>     scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at
>     scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>     at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>     at
>     scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
>     at
>     org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields(TableSourceUtil.scala:521)
>     at
>     org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:127)
>     at
>     org.apache.flink.table.plan.schema.StreamTableSourceTable.<init>(StreamTableSourceTable.scala:33)
>     at
>     org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:150)
>     at
>     org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:541)
>     at
>     org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:47)
>     at
>     org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSourceAndSink(ConnectTableDescriptor.scala:68)
>
>     And here's the source code:
>
>
>
>       case class Record(status:String, direction:String,var event_ts: Timestamp)
>
>
>        def main(args: Array[String]): Unit = {
>          val senv = StreamExecutionEnvironment.getExecutionEnvironment senv.setParallelism(1)
>          senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>
>          val data: DataStream[Record] = ...
>          val tEnv = TableEnvironment.getTableEnvironment(senv)tEnv
>            // declare the external system to connect to .connect(
>            new Kafka()
>              .version("0.11")
>              .topic("processed5.events")
>              .startFromEarliest()
>              .property("zookeeper.connect","localhost:2181")
>              .property("bootstrap.servers","localhost:9092"))
>            .withFormat(new Json()
>              .failOnMissingField(false)
>              .deriveSchema()
>            )
>            .withSchema(
>              new Schema()
>                .field("status", Types.STRING)
>                .field("direction", Types.STRING)
>                .field("event_ts", Types.SQL_TIMESTAMP).rowtime(
>                new Rowtime().timestampsFromField("event_ts").watermarksPeriodicAscending())
>            )
>
>            // specify the update-mode for streaming tables .inAppendMode()
>
>            // register as source, sink, or both and under a name .registerTableSourceAndSink("MyUserTable");
>
>          tEnv.fromDataStream(data).insertInto("MyUserTable")
>


Mime
View raw message