flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From <wxchunj...@163.com>
Subject 实现 KafkaUpsertTableSink
Date Sat, 28 Mar 2020 09:38:08 GMT
各位大佬:

                由于现在的 KafkaTableSink 不支持 sql 中有group ,参照
KafkaTableSink 和 HbaseUpsertTableSink  的一套逻辑实现了一套
KafkaUpsertTableSink:

                KafkaUpsertTableSink

KafkaUpsertTableSinkBase

KafkaUpsertTableSourceSinkFactory

KafkaUpsertTableSourceSinkFactoryBase

MyKafkaValidator

但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的
KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册
呢?

 


/**
* Searches for factories using Java service providers.
*
* @return all factories in the classpath
*/
private static List<TableFactory> discoverFactories(Optional<ClassLoader>
classLoader) {
   try {
      List<TableFactory> result = new LinkedList<>();
      ClassLoader cl =
classLoader.orElse(Thread.currentThread().getContextClassLoader());
      ServiceLoader
         .load(TableFactory.class, cl)
         .iterator()
         .forEachRemaining(result::add);
      //todo add
      result.add(new KafkaUpsertTableSourceSinkFactory());
      return result;
   } catch (ServiceConfigurationError e) {
      LOG.error("Could not load service provider for table factories.", e);
      throw new TableException("Could not load service provider for table
factories.", e);
   }

}

 

 

直接在返回的 result 里面添加对应的  KafkaUpsertTableSourceSinkFactory  是可
以成功运行的。

非常感谢

 

 

------------------

Thanks 

venn

 


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message