flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jincheng sun <sunjincheng...@gmail.com>
Subject Re: 【PyFlink】对于数据以Csv()格式写入kafka报错,以及使用python udf时无法启动udf
Date Fri, 20 Mar 2020 14:34:19 GMT
彭哲夫,你好:

你是如何安装的beam,安装的版本是多少?如果是1.10 需要apache-beam 2.15,如果是
master需要apache-beam
2.19.

BTW,  为了共享你的问题,我将你的问题发到了中文用户列表里面,我们大家一起讨论。

Best,
Jincheng


Zhefu PENG <pengzf0802@gmail.com> 于2020年3月20日周五 下午5:12写道:

> Hi Jincheng,
>
> 针对昨天提到的第二点,我做了两个思路:1. 将我的pyflink.py以本地模式运行,2.对集群节点进行环境配置
>
> 1.
> 在本地模式中,我们发现在装apache-beam包的时候出于某些原因没有装全,少了_bz2模块,再补充上之后,pyflink.py的脚本可以正常运行,其中的udf功能也能正常使用。
>
> 2.
> 在本地模式运行成功的基础上,我们根据你的建议,对所有的worker节点进行了环境的更新,都更新到了python3.6以及安装了apache_beam和apache-flink.
> 但是以集群模式运行带有udf功能的脚本仍然报错,尝试谷歌搜索以后也没有搜到相关解答,在附件附上错误日志,希望能得到帮助(因为本地模式已经成功所以就不附带代码了),非常感谢!
>
> 期待您的回复
> 彭哲夫
>
>
> Zhefu PENG <pengzf0802@gmail.com> 于2020年3月19日周四 下午11:14写道:
>
>> Hi Jincheng:
>>
>> 非常感谢你如此迅速而细致的回复!~
>>
>> 关于第一点:根据你的回复,我在flink的lib目录下增加flink-csv-1.10.0-sql-jar.jar包之后,运行成功。而第一个包我在之前浏览你博客中关于kafka的使用的demo(based
>> on flink 1.9)中有看到并下载,因此这里有个提议,或许你未来可以对于后续使用者补充
>> flink-csv-1.10.0-sql-jar.jar包的使用的必要性 :),但也有可能是我在查询学习时看漏了,但不管怎么说感谢你的帮助解决;
>>
>> 关于第二点:因为部门相关组织安排问题,我现在没有权限去worker节点上查询,但是针对这一点我有个好奇的地方:我目前只在启动脚本的主机上安装了python3.5+,
>> 并且除了udf功能外,我都能正常使用(比如sql本身就有的concat之类,或者add_columns()这种简单功能)。所以是不是我理解为,如果要使用pyflink的全部功能,应该是集群的环境都要是python3.5+?
>> 但是简单的功能,只要启动脚本的主机环境符合就够了?
>> 还是关于第二点,我刚刚又重新跑了一下脚本,本来是希望能获得和之前一样的错误日志发给我的mentor,但是发现这次报了新的问题:
>> java.lang.NoClassDefFoundError: Could not initialize class
>> org.apache.beam.sdk.options.PipelineOptionsFactory
>>     at
>> org.apache.flink.python.AbstractPythonFunctionRunner.open(AbstractPythonFunctionRunner.java:173)
>>     at
>> org.apache.flink.table.runtime.operators.python.AbstractPythonScalarFunctionOperator$ProjectUdfInputPythonScalarFunctionRunner.open(AbstractPythonScalarFunctionOperator.java:193)
>>     at
>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:139)
>>     at
>> org.apache.flink.table.runtime.operators.python.AbstractPythonScalarFunctionOperator.open(AbstractPythonScalarFunctionOperator.java:143)
>>     at
>> org.apache.flink.table.runtime.operators.python.PythonScalarFunctionOperator.open(PythonScalarFunctionOperator.java:73)
>>     at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
>>     at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
>>     at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>>
>>
>> 我猜测原因正如你提到的worker的环境不符标准,我会在明天上班后请同事帮忙check后,根据你的建议进行修改尝试。也希望能解答一下疑问,因为刚毕业参加工作,可能提的问题会显得比较低级,请见谅!
>>
>> 再次感谢你的回复,我会根据建议尽快进行错误修复
>> 彭哲夫
>>
>> jincheng sun <sunjincheng121@gmail.com> 于2020年3月19日周四 下午9:08写道:
>>
>>> 彭哲夫,你好:
>>>
>>> 你上面问题可能原因是:
>>>
>>> 1. pyflink默认不包含kafka connector的jar包和csv的格式JIR包,需要把这些jar包加到pyflink的lib目录下:
>>>
>>> $ PYFLINK_LIB=`python -c "import pyflink;import
>>> os;print(os.path.dirname(os.path.abspath(pyflink.__file__))+'/lib')"`
>>> $ cd $PYFLINK_LIB
>>> $ curl -O
>>> https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.10.0/flink-sql-connector-kafka_2.11-1.10.0.jar
>>> $ curl -O
>>> https://repo1.maven.org/maven2/org/apache/flink/flink-csv/1.10.0/flink-csv-1.10.0-sql-jar.jar
>>>
>>> 2. 有可能的原因是worker上没有安装python3以上环境或者环境中没有安装apache-beam,可以尝试在worker机器上执行一下:
>>> python --version 检查python版本,同时执行 pip list 查看是否有apache-beam,如果没有,可以执行
>>> :python -m pip install apache-flink
>>>
>>> 期望对你有帮助,有问题我们持续沟通。
>>>
>>> Best,
>>> Jincheng
>>>
>>>
>>>
>>> Zhefu PENG <pengzf0802@gmail.com> 于2020年3月19日周四 下午8:13写道:
>>>
>>>> 你好:
>>>>
>>>>
>>>> 在网上看到了你的博客,关于你对pyflink的开发和推动深感敬佩。我们部门因为业务需要最近在调研使用flink相关,我写了个一个简单的demo想做体验和测试,但是遇到了两个问题(第二个问题是目前遇到的比较大的困难,第一个问题采取了规避策略:)):
>>>>
>>>> 1.
>>>> 当我把数据想以Csv格式输出到Kafka时,报错。(从社区文档我已经了解到应该用Csv()取代OldCsv(),并修改)。查看报错信息后我怀疑是因为缺少jar包导致(比如之前使用Json格式时候),但是从另一个文档中了解到csv格式应该是built-in的。目前采取了规避措施,采用json格式输出。
>>>>
>>>> 报错信息如下:
>>>>
>>>> py4j.protocol.Py4JJavaError: An error occurred while calling
>>>> o62.insertInto.
>>>> : org.apache.flink.table.api.NoMatchingTableFactoryException: Could not
>>>> find a suitable table factory for
>>>> 'org.apache.flink.table.factories.SerializationSchemaFactory' in
>>>> the classpath.
>>>>
>>>> Reason: No factory supports all properties.
>>>>
>>>> The matching candidates:
>>>> org.apache.flink.formats.csv.CsvRowFormatFactory
>>>> Unsupported property keys:
>>>> format.fields.#.name
>>>> format.fields.#.data-type
>>>>
>>>> table注册部分代码如下:
>>>> table_env.connect(Kafka()
>>>>                   .version("universal")
>>>>                   .topic(kafka_write_topic)
>>>>                   .property(kafka_server, ','.join(kafka_server_list))
>>>>                   .property(kafka_zookeeper,
>>>> ','.join(kafka_server_list))) \
>>>>     .with_format(Csv()
>>>>                  .schema(DataTypes.ROW([DataTypes.FIELD("a",
>>>> DataTypes.STRING()),
>>>>                                         DataTypes.FIELD("b",
>>>> DataTypes.STRING()),
>>>>                                         DataTypes.FIELD("c",
>>>> DataTypes.STRING())
>>>>                                         ]))) \
>>>>     .with_schema(Schema()
>>>>                  .field("a", DataTypes.STRING())
>>>>                  .field("b", DataTypes.STRING())
>>>>                  .field("c", DataTypes.STRING())) \
>>>>     .create_temporary_table(table_name_output)
>>>>
>>>> 2.采用规避策略后,尝试使用python udf增加自定义的函数丰富功能。但是在按照给的demo中的步骤定义好udf函数后,
>>>> 才运行起来后,一段时间内会超时报错,猜测是因为pyflink-udf-runner.sh这个脚本没有被启用,
>>>> 但是在所依赖的opt/flink-python_2.11-1.10.0.jar的包内可以找到该脚本。
>>>> 报错信息如下:
>>>> 2020-03-19 17:02:23,273 INFO
>>>>  org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory
>>>>  - Still waiting for startup of environment
>>>> '/matrix/data/hadoop/data4/mapred/usercache/root/appcache/application_1584436506937_1231/python-dist-1e474e56-46a7-4ae6-bc4a-871ba86917a6/pyflink-udf-runner.sh'
>>>> for worker id 1
>>>>
>>>> 这部分代码如下:
>>>> @udf(input_types=[DataTypes.STRING()],
>>>>      result_type=DataTypes.STRING())
>>>> def if_length_enough(devid):
>>>>     res = devid + " func"
>>>>     return res
>>>>
>>>> table_env.register_function("if_length_enough", if_length_enough)
>>>>
>>>> table_env.from_path(table_name_input) \
>>>>     .select("a, b, if_length_enough(c)") \
>>>>     .insert_into(table_name_output)
>>>>
>>>>
>>>> 以上两个错误,困扰了一下午,希望能帮忙解答,非常感谢!
>>>> 期待您的回复。
>>>>
>>>> 彭哲夫
>>>>
>>>>
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message