flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "op" <520075...@qq.com>
Subject How to use Hbase Connector Sink
Date Thu, 11 Jun 2020 07:30:35 GMT
hi&nbsp;
flink1.10,wen i want to sink data to hbase table like this:


&nbsp;bstEnv.sqlUpdate("""CREATE TABLE circle_weight (
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;rowkey String,
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;info ROW<score double&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; &nbsp; &nbsp;) WITH (
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;'connector.type' = 'hbase',
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;'connector.version' = '1.4.3',
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;'connector.table-name' = 'ms:test_circle_info',
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;'connector.zookeeper.quorum' = 'localhost:2181',
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;'connector.zookeeper.znode.parent' =
'/hbase-secure',
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;'connector.write.buffer-flush.max-size'
= '10mb',
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;'connector.write.buffer-flush.max-rows'
= '1000',
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;'connector.write.buffer-flush.interval'
= '2s'
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; &nbsp; &nbsp;)""")
					&nbsp;
&nbsp; &nbsp; bstEnv.sqlUpdate(
&nbsp; &nbsp; &nbsp; """
&nbsp; &nbsp; &nbsp; &nbsp; |insert into circle_weight
&nbsp; &nbsp; &nbsp; &nbsp; |select
&nbsp; &nbsp; &nbsp; &nbsp; |concat_ws('_',circleName,dt) rowkey,
&nbsp; &nbsp; &nbsp; &nbsp; |active_ratio*25 score
&nbsp; &nbsp; &nbsp; &nbsp; |from tb""")
	


but i get following exceptions,can anybody tell me what is wrong?


Exception in thread "main" org.apache.flink.table.api.ValidationException: Field types of
query result and registered TableSink default_catalog.default_database.circle_weight do not
match.
Query schema: [rowkey: STRING, score: DOUBLE]
Sink schema: [rowkey: STRING, info: ROW<`score` DOUBLE&gt;]
	at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:96)
	at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:198)
	at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190)
	at scala.Option.map(Option.scala:146)
	at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190)
	at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
	at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
	at com.souhu.msns.huyou.test.table.sql.CircleWeightRank$.main(CircleWeightRank.scala:170)
	at com.souhu.msns.huyou.test.table.sql.CircleWeightRank.main(CircleWeightRank.scala)
Mime
View raw message