flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-10119) JsonRowDeserializationSchema deserialize kafka message
Date Fri, 17 Aug 2018 17:45:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-10119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584206#comment-16584206
] 

ASF GitHub Bot commented on FLINK-10119:
----------------------------------------

buptljy commented on issue #6571: [FLINK-10119]- Add failure handlers for JsonRowDeserializationSchema
URL: https://github.com/apache/flink/pull/6571#issuecomment-413939315
 
 
   I think the failure is not caused by this PR.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> JsonRowDeserializationSchema deserialize kafka message
> ------------------------------------------------------
>
>                 Key: FLINK-10119
>                 URL: https://issues.apache.org/jira/browse/FLINK-10119
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.5.1
>         Environment: 无
>            Reporter: sean.miao
>            Assignee: buptljy
>            Priority: Major
>              Labels: pull-request-available
>
> Recently, we are using Kafka010JsonTableSource to process kafka's json messages.We turned
on checkpoint and auto-restart strategy .
> We found that as long as the format of a message is not json, it will cause the job to
not be pulled up. Of course, this is to ensure that only once processing or at least once
processing, but the resulting application is not available and has a greater impact on us.
> the code is :
> class : JsonRowDeserializationSchema
> function :
> @Override
>  public Row deserialize(byte[] message) throws IOException {
>  try
> { final JsonNode root = objectMapper.readTree(message); return convertRow(root, (RowTypeInfo)
typeInfo); }
> catch (Throwable t)
> { throw new IOException("Failed to deserialize JSON object.", t); }
> }
> now ,i change it to  :
> public Row deserialize(byte[] message) throws IOException {
>  try
> { JsonNode root = this.objectMapper.readTree(message); return this.convertRow(root, (RowTypeInfo)this.typeInfo);
}
> catch (Throwable var4) {
>  message = this.objectMapper.writeValueAsBytes("{}");
>  JsonNode root = this.objectMapper.readTree(message);
>  return this.convertRow(root, (RowTypeInfo)this.typeInfo);
>  }
>  }
>  
> I think that data format errors are inevitable during network transmission, so can we
add a new column to the table for the wrong data format? like spark sql does。
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message