flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From PedroMrChaves <pedro.mr.cha...@gmail.com>
Subject JsonMappingException: No content to map due to end-of-input
Date Thu, 13 Oct 2016 15:56:30 GMT
Hello,

I recently started programming with Apache Flink API. I am trying to get
input directly 
from kafka in a JSON format with the following code:

/private void kafkaConsumer(String server, String topic) {
		Properties properties = new Properties();
		properties.setProperty("bootstrap.servers", server);
		properties.setProperty("group.id", "Demo");
		stream = environment.addSource(new FlinkKafkaConsumer09<>(topic, new
JSONDeserializationSchema(), properties))
				.map(new MapFunction<ObjectNode, Event>() {
					@Override
					public Event map(ObjectNode value) throws Exception {
						return new Event(Integer.parseInt(value.get("id").asText()),
value.get("user").asText(),
								value.get("action").asText(), value.get("ip").asText());
					}
				});
	}/

But I alwys get the following error:


/17:56:46,335 ERROR org.apache.flink.runtime.taskmanager.Task                    
- Task execution failed.
com.fasterxml.jackson.databind.JsonMappingException: No content to map due
to end-of-input
 at [Source: [B@69a90966; line: 1, column: 1]
        at
com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:148)
        at
com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3095)
        at
com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3036)
        at
com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2215)
        at
org.apache.flink.streaming.util.serialization.JSONDeserializationSchema.deserialize(JSONDeserializationSchema.java:38)
        at
org.apache.flink.streaming.util.serialization.JSONDeserializationSchema.deserialize(JSONDeserializationSchema.java:30)
        at
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:39)
        at
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:227)
        at java.lang.Thread.run(Thread.java:745)/

What am I doing wrong?

Attached follows the JSON sample that I am using. 

Thank you and Regards.

log.json
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n9536/log.json>
 



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/JsonMappingException-No-content-to-map-due-to-end-of-input-tp9536.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Mime
View raw message