flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yang Wang <danrtsey...@gmail.com>
Subject Re: [Question] How to use different filesystem between checkpoint data and user data sink
Date Wed, 18 Dec 2019 11:40:21 GMT
You could have a try the new plugin mechanism.
Create a new directory named "myhdfs" under $FLINK_HOME/plugins, and then
put your filesystem related jars in it.
Different plugins will be loaded by separate classloader to avoid conflict.


Best,
Yang

vino yang <yanghua1127@gmail.com> 于2019年12月18日周三 下午6:46写道:

> Hi ouywl,
>
> *>>    Thread.currentThread().getContextClassLoader();*
>
> What does this statement mean in your program?
>
> In addition, can you share your implementation of the customized file
> system plugin and the related exception?
>
> Best,
> Vino
>
> ouywl <ouywl@139.com> 于2019年12月18日周三 下午4:59写道:
>
>> Hi all,
>>     We have implemented a filesystem plugin for sink data to hdfs1, and
>> the yarn for flink running is used hdfs2. So when the job running, the
>> jobmanager use the conf of hdfs1 to create filesystem, the filesystem
>> plugin  is conflict with flink component.
>>     We implemeted step:
>>       1.  ‘FileSystemEnhance’ is implement from “FileSystem”
>>       2.  ‘FileSystemFactoryEnhance’ is implement from “FileSystemFactory”,add
>> kerberos auth in ”FileSystemFactoryEnhance"
>>       3. Add a service entry. Create a file
>> META-INF/services/org.apache.flink.core.fs.FileSystemFactory which
>> contains the class name of “ FileSystemFactoryEnhance.class”
>>
>> And  the job mainclass is :
>>    “ *public static void main(String[] args) throws Exception{*
>>
>> *    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();*
>>
>>
>>
>>
>>
>>
>>
>>
>> *    env.enableCheckpointing(60*1000);    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
   env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
   env.getConfig().enableSysoutLogging();    Properties props = new Properties();    props.put("bootstrap.servers",
SERVERS);    props.put("group.id <http://group.id>", GROUPID);    props.put("enable.auto.commit",
"true");    // props.put("auto.commit.interval.ms <http://auto.commit.interval.ms>",
"1000");    props.put("session.timeout.ms <http://session.timeout.ms>", "30000");  
 props.put("auto.offset.reset", "latest");    props.put("key.deserializer", org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StringDeserializer.class.getName());
   props.put("value.deserializer", StringDeserializer.class.getName());    FlinkKafkaConsumer010
consumer011 = new FlinkKafkaConsumer010<String>("zyf_test_2", new SimpleStringSchema(),
props);    DataStream<String> source = env.addSource(consumer011).setParallelism(1);
   source.print();    Thread.currentThread().getContextClassLoader();    StreamingFileSink
sink = StreamingFileSink            .forRowFormat(new Path("hdfs://bdms-test/user/sloth/zyf"),
new SimpleStringEncoder<>("UTF-8"))            .build();    source.addSink(sink);  
 env.execute();}”And start the job, the jobmanager filesystem is error, the log means the
jobmananger use “FileSystemFactoryEnhance” filesystem and confict.As the url https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems
<https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems>
how to avoid use “Thread.currentThread().getContextClassLoader()"*
>>
>>
>> ouywl
>> ouywl@139.com
>>
>> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=ouywl&uid=ouywl%40139.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsma8dc7719018ba2517da7111b3db5a170.jpg&items=%5B%22ouywl%40139.com%22%5D>
>>
>>

Mime
View raw message