flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ddwcg <3149768...@qq.com>
Subject Re: flink 1.9.0 StreamTableEnvironment 编译错误
Date Mon, 26 Aug 2019 03:37:01 GMT
感谢您的回复,确实是这个原因。但是后面注册表的时候不能使用 Expression
总是感觉 java api 和scala api有点混乱了



> 在 2019年8月26日,11:22,Zili Chen <wander4096@gmail.com> 写道:
> 
> 试试把
> 
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> 
> 换成
> 
> import org.apache.flink.table.api.scala.StreamExecutionEnvironment
> 
> 应该是意外 import 了不同包下的同名类的缘故
> 
> Best,
> tison.
> 
> 
> ddwcg <3149768603@qq.com> 于2019年8月26日周一 上午11:12写道:
> 
>> 大家好,
>> 我周末升级到了1.9.0,但是在初始化table env的时候编译不通过,请大家帮忙看看是什么原因,
>> 
>> import org.apache.flink.streaming.api.CheckpointingMode
>> import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
>> import org.apache.flink.table.api.scala.StreamTableEnvironment
>> import org.apache.flink.table.planner.expressions.StddevPop
>> import org.apache.kafka.clients.consumer.ConsumerConfig
>> import org.apache.kafka.clients.producer.ProducerConfig
>> 
>> object StreamingJob {
>>  def main(args: Array[String]) {
>>    val kafkaTopic = "source.kafka.topic"
>>    val jobName ="test"
>>    val parallelism =1
>>    val checkPointPath ="checkpoint/"
>>    val kafkaBrokers =""
>> 
>>    // set up the streaming execution environment
>>    val env = StreamExecutionEnvironment.getExecutionEnvironment
>>    env.setParallelism(parallelism)
>>    env.enableCheckpointing(10000)
>>    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
>>    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>>    env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>>    //env.setStateBackend(new FsStateBackend(checkPointPath))
>> 
>> 
>>    val tableEnv = StreamTableEnvironment.create(env)
>> 
>> 
>> 提示有多个实现:
>> 
>> 下面是pom文件:
>> 
>> <dependency>
>>   <groupId>org.apache.flink</groupId>
>>   <artifactId>flink-scala_${scala.binary.version}</artifactId>
>>   <version>${flink.version}</version>
>>   <scope>compile</scope>
>> </dependency>
>> <dependency>
>>   <groupId>org.apache.flink</groupId>
>>   <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
>>   <version>${flink.version}</version>
>>   <scope>compile</scope>
>> </dependency>
>> <dependency>
>>   <groupId>org.apache.flink</groupId>
>>   <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
>>   <version>${flink.version}</version>
>>   <scope>provided</scope>
>> </dependency>
>> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-runtime-blink
-->
>> <dependency>
>>   <groupId>org.apache.flink</groupId>
>>   <artifactId>flink-table-runtime-blink_2.11</artifactId>
>>   <version>1.9.0</version>
>> </dependency>
>> <dependency>
>>   <groupId>org.apache.flink</groupId>
>>   <artifactId>flink-connector-kafka_2.11</artifactId>
>>   <version>1.9.0</version>
>> </dependency>
>> <dependency>
>>   <groupId>org.apache.flink</groupId>
>>   <artifactId>flink-table-common</artifactId>
>>   <version>${flink.version}</version>
>>   <scope>provided</scope>
>> </dependency>
>> 
>> 
>> 
>> 


Mime
View raw message