flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jincheng sun <sunjincheng...@gmail.com>
Subject Re: [udf questions]
Date Thu, 26 Mar 2020 09:55:03 GMT
比较明显的一个问题是UDF定义有些问题是你的sink表定义了一个STRING类型的字段,但是你SQL里面是SELECT了udf的结果,所以我想你的UDF应该也返回一个STRING,也就是result_type=DataTypes.STRING(),同时确保你udf真的返回的是一个字符串,不是json的Object。

Best,
Jincheng


WuPangang <wpangang1989@icloud.com> 于2020年3月26日周四 下午5:24写道:

> Data as below:
>  {"host":"172.25.69.145","@timestamp":"2020-03-23T09:21:16.315Z","@version":"1","offset":532178261,"logmark":"advertise-api","type":"squirrel","path":"/home/logs/app/action_log/advertise_api/AdStatistic.2020-03-23.log","message":"{\"date_time\":\"2020-03-23
> 17:21:15\",\"serverTime\":1584955275,\"currentTime\":\"1584955274\",\"distinct_id\":\"734232168\",\"device_id\":\"ad2c15b7cf910cc6\",\"event\":\"view_material\",\"click_pos\":\"ad_view_avatar\",\"material_name\":\"\\u5934\\u50cf\\u66dd\\u5149\",\"phase\":\"prod\",\"source\":\"phone\",\"client_v\":\"2.100\",\"platform\":\"android\",\"ip\":\"39.84.23.81\",\"network\":\"WIFI\",\"idfa\":\"867179032526091\",\"unique_device_id\":\"ad2c15b7cf910cc6\",\"manufacturer\":\"HUAWEI\",\"model\":\"PRA-AL00X\",\"carrier\":\"\",\"local_dns\":\"0.0,0.0\",\"user_id\":\"734232168\",\"is_login\":1,\"url_path\":\"http:\\/\\/
> down-ddz.734399.com\\/download\\/shuabao\\/shuabao10-awddz-release-hl.apk\",\"country\":\"\",\"province\":\"\",\"city\":\"\",\"setup_source\":\"androiddefault\",\"seller_id\":\"139091\",\"plan_id\":\"380141\",\"plan_name\":\"HL-10-12.27-\\u5927\\u80f8-\\u6e38\\u620f\",\"put_platform_id\":\"25\",\"put_ad_type_id\":\"27\",\"put_position_id\":\"0\",\"put_sell_type_id\":\"0\",\"material_id\":\"378120\",\"show_id\":\"SMALL_VIDEO_6948314\",\"put_start_time\":\"1577808000\",\"plan_params\":\"advertise-380141-378120\",\"download_app_name\":\"\\u7231\\u73a9\\u6597\\u5730\\u4e3b\",\"ad_type\":\"0\",\"put_source\":\"1\",\"is_ad_recommend\":\"\",\"third_video_url\":\"\",\"third_img_url\":\"\",\"ua\":\"JuMei\\/
> (PRA-AL00X; Android; Android OS ; 8.0.0; zh)
> ApacheHttpClient\\/4.0\",\"platform_v\":\"2.100\",\"played_time\":\"\",\"video_time\":\"\",\"status\":1,\"target_link\":\"http:\\/\\/
> down-ddz.734399.com
> \\/download\\/shuabao\\/shuabao10-awddz-release-hl.apk\",\"ad_material_title\":\"\\u5de5\\u8d44\\u6ca13W\\uff0c\\u5c31\\u73a9\\u8fd9\\u6597\\u5730\\u4e3b\\uff0c\\u6bcf\\u5929\\u90fd\\u80fd\\u9886\\u7ea2\\u5305~\",\"ad_material_desc\":\"\\u73a9\\u6597\\u5730\\u4e3b\\uff0c\\u7ea2\\u5305\\u79d2\\u4f53\\u73b0\",\"icon_url\":\"http:\\/\\/
> p12.jmstatic.com
> \\/adv\\/\\/material\\/20190920\\/jmadv5d849b203750f.png\",\"button_text\":\"\\u7acb\\u5373\\u4e0b\\u8f7d\",\"material_type\":\"video\",\"third_app_id\":\"\",\"third_pos_id\":\"\",\"download_apk\":\"\",\"package_name\":\"\",\"h5_url\":\"\",\"message\":\"\",\"unit_price\":\"4.15\",\"balance_type\":\"3\",\"ecpm\":\"15.189\",\"accumulate_ctr\":\"0.366\",\"pre_ctr\":\"0.366\",\"real_unit_price\":\"0\",\"pre_charge\":\"0\",\"pre_change\":\"0\",\"subsidy_type\":\"\",\"subsidy_ratio\":\"\",\"suppress_type\":\"0\",\"suppress_ratio\":\"0\",\"two_ecpm\":\"15.165\",\"real_ecpm\":\"0\",\"real_unit_price_threshold\":\"0\",\"plan_accumulate_pv\":\"0\",\"plan_accumulate_cpv\":\"0\",\"plan_accumulate_change\":\"0\",\"request_id\":\"ad2c15b7cf910cc6-1584955121.768552967\",\"ad_activity_type\":\"0\",\"one_cost\":\"4.15\",\"bid\":\"0.366\",\"real_spr\":\"0.998419\",\"one_ecpm\":\"15.189\",\"es_one\":\"0,16,\",\"es_two\":\"0\",\"es_three\":\"0\",\"es_four\":\"0\",\"age\":\"1020\",\"sex\":\"0\",\"one_cost_expend\":\"4.15\",\"two_cost_expend\":\"4.153438\",\"reward_source\":\"\",\"provide\":\"\"}","topicid":"advertise_module","project":"advertise-api”}
> Problem:
> 数据是个嵌套json,并且核心字段message的格式不能直接通过table api
json 相关的方法来处理。
> 自己思考的解决思路:通过udf, 使用json.loads来处理。
> 实际使用中遇到的问题: job提交之后,在dashboard上发现bytes received,records
recevied
> 都是0;下游是同步给Kafka,去消费下游kafka也没有数据。
>
> Code as below:
> from pyflink.datastream import
> StreamExecutionEnvironment,CheckpointingMode,TimeCharacteristic
> from pyflink.table import StreamTableEnvironment,
> EnvironmentSettings,TableSink,TableConfig,DataTypes
> from pyflink.table.descriptors import
> Schema,Kafka,Rowtime,ConnectTableDescriptor,CustomConnectorDescriptor,Json
> from pyflink.common import RestartStrategies
> from pyflink.table.udf import udf
> import json
>
> env = StreamExecutionEnvironment.get_execution_environment()
> #env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> ##checkpoint设置
> #env.enable_checkpointing(300000)
>
> #env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode().EXACTLY_ONCE)
> #env.get_checkpoint_config().set_min_pause_between_checkpoints(30000)
> #env.get_checkpoint_config().set_checkpoint_timeout(60000)
> #env.get_checkpoint_config().set_max_concurrent_checkpoints(1)
> #env.get_checkpoint_config().set_prefer_checkpoint_for_recovery(False)
> ##contain设置
> env.set_parallelism(12)
> env.set_restart_strategy(RestartStrategies.fixed_delay_restart(50,6))
> ##使用blink api
> environment_settings =
> EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
> table_env =
> StreamTableEnvironment.create(env,environment_settings=environment_settings)
>
>
> table_env.sql_update("CREATE TABLE flink_sourcetable_ad_test ( \
> host STRING, \
> type STRING, \
> topicid STRING, \
> message STRING, \
> proctime as PROCTIME() \
> ) WITH ( \
>   'connector.type' = 'kafka',        \
>   'connector.version' = 'universal', \
>   'connector.topic' = 'advertise_module',  \
>   'connector.properties.zookeeper.connect' = 'localhost:2181', \
>   'connector.properties.bootstrap.servers' = 'localhost:9092', \
>   'connector.properties.group.id' = 'flink_1.10_test_source', \
>   'connector.startup-mode' = 'latest-offset', \
>   'format.type' = 'json', \
>   'format.derive-schema' = 'true' \
> )")
>
>
> table_env.sql_update("CREATE TABLE flink_sinktable_ad_test ( \
> message STRING \
> ) WITH ( \
>   'connector.type' = 'kafka',        \
>   'connector.version' = 'universal', \
>   'connector.topic' = 'recommend_user_concern_test',  \
>   'connector.properties.zookeeper.connect' = 'localhost:2181', \
>   'connector.properties.bootstrap.servers' = 'localhost:9092', \
>   'connector.properties.group.id' = 'flink_1.10_test_sink', \
>   'connector.startup-mode' = 'latest-offset', \
>   'format.type' = 'json', \
>   'connector.properties.retries' = '3', \
>   'connector.properties.update_mode' = 'append' \
> )")
>
> @udf(input_types=DataTypes.STRING(), result_type=DataTypes.ROW())
> def json_split(message):
>   return json.loads(message.replace('\"', '"').replace('"{"',
> '{"').replace('}"', '}'))
> table_env.register_function("json_split", json_split)
>
> table_env.sql_update("insert into flink_sinktable_ad_test \
>                         select \
>                         json_split(message) AS message\
>                         from \
>                         flink_sourcetable_ad_test \
>                         ")
> table_env.execute('flink_1.10_test')
>
>

Mime
  • Unnamed multipart/related (inline, None, 0 bytes)
View raw message