flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timo Walther <twal...@apache.org>
Subject Re: Datastream[Row] covert to table exception
Date Thu, 07 Jun 2018 06:28:59 GMT
Sorry, I didn't see you last mail. The code looks good actually. What is 
the result of `inputStream.getType` if you print it to the console?

Timo

Am 07.06.18 um 08:24 schrieb Timo Walther:
> Hi,
>
> Row is a very special datatype where Flink cannot generate serializers 
> based on the generics. By default DeserializationSchema uses 
> reflection-based type analysis, you need to override the 
> getResultType() method in WormholeDeserializationSchema. And specify 
> the type information manually there.
>
> Hope this helps.
>
> Regards,
> Timo
>
> Am 06.06.18 um 13:22 schrieb 孙森:
>> Hi ,
>>
>> I've tried to to specify such a schema, when I read from kafka, and 
>> covert inputstream to table . But I got the exception:
>>
>>   * Exception in thread "main"
>>     org.apache.flink.table.api.TableException: An input of
>>     GenericTypeInfo cannot be converted to Table. Please specify the
>>     type of the input with a RowTypeInfo
>>
>> And the code here:
>>
>>
>> |private def getSchemaMap(jsonSchema: String) = { val umsSchema = 
>> JsonUtils.json2caseClass[UmsSchema](jsonSchema) val fields = 
>> umsSchema.fields_get val fieldNameList = ListBuffer.empty[String] val 
>> fieldTypeList = ListBuffer.empty[TypeInformation[_]] fields.foreach { 
>> field => fieldNameList.append(field.name) 
>> fieldTypeList.append(fieldTypeMatch(field.`type`)) } 
>> println(fieldNameList) println(fieldTypeList) (fieldNameList.toArray, 
>> fieldTypeList.toArray) } private def fieldTypeMatch(umsFieldType: 
>> UmsFieldType): TypeInformation[_] = { umsFieldType match { case 
>> STRING => Types.STRING case INT => Types.INT case LONG => Types.LONG 
>> case FLOAT => Types.FLOAT case DOUBLE => Types.DOUBLE case BOOLEAN => 
>> Types.BOOLEAN case DATE => Types.SQL_DATE case DATETIME => 
>> Types.SQL_TIMESTAMP case DECIMAL => Types.DECIMAL } } } val 
>> myConsumer: FlinkKafkaConsumer010[Row] = new 
>> FlinkKafkaConsumer010(topics, new 
>> WormholeDeserializationSchema(jsonSchema), properties) val 
>> inputStream: DataStream[Row] = env.addSource(myConsumer) val tableEnv 
>> = TableEnvironment.getTableEnvironment(env)<<—————exception here|
>>
>>
>>
>> Thanks !
>> sen
>
>


Mime
View raw message