flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ken Krugler <kkrugler_li...@transpac.com>
Subject Re: Error While Initializing S3A FileSystem
Date Wed, 15 May 2019 23:05:10 GMT
Hi Manish,

Are you sure this is an exception that’s actually killing the job?

Asking because https://issues.apache.org/jira/browse/BEANUTILS-477 <https://issues.apache.org/jira/browse/BEANUTILS-477>
talks about Commons Beanutils logging this exception, but it’s a warning vs. something being
thrown up the stack.

— Ken

> On May 15, 2019, at 3:50 PM, Manish Bellani <manish.bellani@gmail.com> wrote:
> 
> hey Friends,
> 
> Thanks for all the work you have been doing on flink, I have been trying to use BucketingSink
(backed by S3AFileSystem) to write data to s3 and I'm running into some issues (which I suspect
could be dependency/packaging related) that'd try to describe here.
> 
> The data pipeline is quite simple:
> 
> Kafka -> KafkaConsumer (Source) -> BucketingSink (S3AFileSystem) -> S3
> Environment:
> 
> Kubernetes
> Debian
> DockerImage: flink:1.7.2-hadoop28-scala_2.11
> Java 1.8
> Hadoop Version: 2.8.5
> I followed this dependency section: https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/aws.html#flink-for-hadoop-27
<https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/aws.html#flink-for-hadoop-27>
to place the dependencies under /opt/flink/lib (with an exception that my Hadoop version and
it's dependencies that I pull in are different).
> 
> Here are the dependencies I'm pulling in (excerpt from my Dockerfile)
> 
> RUN cp /opt/flink/opt/flink-s3-fs-hadoop-1.7.2.jar /opt/flink/lib/flink-s3-fs-hadoop-1.7.2.jar
> RUN wget -O /opt/flink/lib/hadoop-aws-2.8.5.jar https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.8.5/hadoop-aws-2.8.5.jar
<https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.8.5/hadoop-aws-2.8.5.jar>
> RUN wget -O /opt/flink/lib/aws-java-sdk-s3-1.10.6.jar http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.10.6/aws-java-sdk-s3-1.10.6.jar
> #Transitive <http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.10.6/aws-java-sdk-s3-1.10.6.jar#Transitive>
Dependency of aws-java-sdk-s3
> RUN wget -O /opt/flink/lib/aws-java-sdk-core-1.10.6.jar http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-core/1.10.6/aws-java-sdk-core-1.10.6.jar
<http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-core/1.10.6/aws-java-sdk-core-1.10.6.jar>
> RUN wget -O /opt/flink/lib/aws-java-sdk-kms-1.10.6.jar http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-kms/1.10.6/aws-java-sdk-kms-1.10.6.jar
<http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-kms/1.10.6/aws-java-sdk-kms-1.10.6.jar>
> RUN wget -O /opt/flink/lib/jackson-annotations-2.5.3.jar http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.5.3/jackson-annotations-2.5.3.jar
<http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.5.3/jackson-annotations-2.5.3.jar>
> RUN wget -O /opt/flink/lib/jackson-core-2.5.3.jar http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.5.3/jackson-core-2.5.3.jar
<http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.5.3/jackson-core-2.5.3.jar>
> RUN wget -O /opt/flink/lib/jackson-databind-2.5.3.jar http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.5.3/jackson-databind-2.5.3.jar
<http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.5.3/jackson-databind-2.5.3.jar>
> RUN wget -O /opt/flink/lib/joda-time-2.8.1.jar http://central.maven.org/maven2/joda-time/joda-time/2.8.1/joda-time-2.8.1.jar
<http://central.maven.org/maven2/joda-time/joda-time/2.8.1/joda-time-2.8.1.jar>
> RUN wget -O /opt/flink/lib/httpcore-4.3.3.jar http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.3.3/httpcore-4.3.3.jar
<http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.3.3/httpcore-4.3.3.jar>
> RUN wget -O /opt/flink/lib/httpclient-4.3.6.jar http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.3.6/httpclient-4.3.6.jar
<http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.3.6/httpclient-4.3.6.jar>
>  
> But when I submit the job, it throws this error during initialization of BucketingSink/S3AFileSystem:
> 
> java.beans.IntrospectionException: bad write method arg count: public final void org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.AbstractConfiguration.setProperty(java.lang.String,java.lang.Object)
>     at java.beans.PropertyDescriptor.findPropertyType(PropertyDescriptor.java:657)
>     at java.beans.PropertyDescriptor.setWriteMethod(PropertyDescriptor.java:327)
>     at java.beans.PropertyDescriptor.<init>(PropertyDescriptor.java:139)
>     at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.FluentPropertyBeanIntrospector.createFluentPropertyDescritor(FluentPropertyBeanIntrospector.java:178)
>     at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.FluentPropertyBeanIntrospector.introspect(FluentPropertyBeanIntrospector.java:141)
>     at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.PropertyUtilsBean.fetchIntrospectionData(PropertyUtilsBean.java:2245)
>     at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.PropertyUtilsBean.getIntrospectionData(PropertyUtilsBean.java:2226)
>     at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.PropertyUtilsBean.getPropertyDescriptor(PropertyUtilsBean.java:954)
>     at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.PropertyUtilsBean.isWriteable(PropertyUtilsBean.java:1478)
>     at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.isPropertyWriteable(BeanHelper.java:521)
>     at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.initProperty(BeanHelper.java:357)
>     at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.initBeanProperties(BeanHelper.java:273)
>     at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.initBean(BeanHelper.java:192)
>     at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper$BeanCreationContextImpl.initBean(BeanHelper.java:669)
>     at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.DefaultBeanFactory.initBeanInstance(DefaultBeanFactory.java:162)
>     at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.DefaultBeanFactory.createBean(DefaultBeanFactory.java:116)
>     at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.createBean(BeanHelper.java:459)
>     at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.createBean(BeanHelper.java:479)
>     at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.createBean(BeanHelper.java:492)
>     at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.builder.BasicConfigurationBuilder.createResultInstance(BasicConfigurationBuilder.java:447)
>     at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.builder.BasicConfigurationBuilder.createResult(BasicConfigurationBuilder.java:417)
>     at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.builder.BasicConfigurationBuilder.getConfiguration(BasicConfigurationBuilder.java:285)
>     at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.metrics2.impl.MetricsConfig.loadFirst(MetricsConfig.java:119)
>     at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.metrics2.impl.MetricsConfig.create(MetricsConfig.java:98)
>     at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.metrics2.impl.MetricsSystemImpl.configure(MetricsSystemImpl.java:478)
>     at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.metrics2.impl.MetricsSystemImpl.start(MetricsSystemImpl.java:188)
>     at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.metrics2.impl.MetricsSystemImpl.init(MetricsSystemImpl.java:163)
>     at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AInstrumentation.getMetricsSystem(S3AInstrumentation.java:251)
>     at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AInstrumentation.registerAsMetricsSource(S3AInstrumentation.java:264)
>     at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AInstrumentation.<init>(S3AInstrumentation.java:243)
>     at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:244)
>     at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:125)
>     at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:395)
>     at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1224)
>     at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:432)
>     at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:376)
>     at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>     at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>     at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>     at java.lang.Thread.run(Thread.java:748)
>  
> 
> 
>  
> Some googling about "bad write method arg count" reveals that it could potentially be
related to a beanutils issue, but I'm not entirely sure. I've hunted through all the jars
that are on the classpath:
> 
> /usr/lib/jvm/java-8-openjdk-amd64/bin/java -Djavax.net.ssl.trustStore=/etc/ssl/java-certs/cacerts
-XX:+UseG1GC -Xms5530M -Xmx5530M -XX:MaxDirectMemorySize=8388607T -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
-Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml -classpath /opt/flink/lib/aws-java-sdk-core-1.10.6.jar:/opt/flink/lib/aws-java-sdk-kms-1.10.6.jar:/opt/flink/lib/aws-java-sdk-s3-1.10.6.jar:/opt/flink/lib/flink-metrics-datadog-1.6.2.jar:/opt/flink/lib/flink-python_2.11-1.7.2.jar:/opt/flink/lib/flink-s3-fs-hadoop-1.7.2.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.7.2.jar:/opt/flink/lib/hadoop-aws-2.8.5.jar:/opt/flink/lib/httpclient-4.3.6.jar:/opt/flink/lib/httpcore-4.3.3.jar:/opt/flink/lib/jackson-annotations-2.5.3.jar:/opt/flink/lib/jackson-core-2.5.3.jar:/opt/flink/lib/jackson-databind-2.5.3.jar:/opt/flink/lib/joda-time-2.8.1.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.15.jar:/opt/flink/lib/flink-dist_2.11-1.7.2.jar::/opt/flink/conf:
org.apache.flink.runtime.taskexecutor.TaskManagerRunner --configDir /opt/flink/conf
>  
> and i see that `FluentPropertyBeanIntrospector` is contained within the following two
jars:
> 
> flink-s3-fs-hadoop-1.7.2.jar:org/apache/flink/fs/shaded/hadoop3/org/apache/commons/beanutils/FluentPropertyBeanIntrospector.class
> flink-shaded-hadoop2-uber-1.7.2.jar:2019 org/apache/commons/beanutils/FluentPropertyBeanIntrospector.class
> 
>  both of those jars are packaged as part of the flink distribution I'm using. I can't
think of any other options atm other than thinking that this could potentially be some incompatible
transitive dependency issue. I would love to get some advice from y'all to see if this is
a packaging bug or something else on my side.
> 
> 
> 
> Thanks
> 
> Manish
> 

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra


Mime
View raw message