flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chesnay Schepler <ches...@apache.org>
Subject Re: Can't get keyed messages from Kafka
Date Tue, 13 Jun 2017 15:45:43 GMT
in getProducedType(), replace the implementation with:

return new TupleTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, 
TypeExtractor.getForClass(CustomObject.class));

On 13.06.2017 17:18, AndreaKinn wrote:
> Can I ask you to help me? I trying to implement a CustomDeserializer
> My kafka messages are composed by KeyedMessages where key and messages are
> strings.
> I created a new class named CustomObject to manage the message string
> because it's more complex then a simple string.
>
>
> public class CustomDeserializer implements
> KeyedDeserializationSchema<Tuple2&lt;String,CustomObject>>{
>
> 	@Override
> 	public boolean isEndOfStream(Tuple2<String, CustomJSONObject> nextElement)
> {
> 		return false;
> 	}
>
> 	@Override
> 	public TypeInformation<Tuple2&lt;String, CustomJSONObject>>
> getProducedType() {
> 		return null;
> 	}
>
> 	@Override
> 	public Tuple2<String, CustomJSONObject> deserialize(byte[] messageKey,
> byte[] message, String topic, int partition, long offset)
> 			throws IOException {
> 		
> 		String key = new String(messageKey);
> 		String msg = new String(message);
> 		CustomObject customObj = new CustomObject(msg);
> 		
> 		Tuple2<String,CustomObject> tuple = new Tuple2<String,CustomObject>(key,
> customObj);
> 		return tuple;
> 	}
> }
>
> Questions:
>
> - I don't understand what is getProducedType method and its usefulness.
> - Which methods have I to implement in my CustomObject class?
>
> My main:
>
> DataStream<Tuple2&lt;String,CustomJSONObject>> stream = env.addSource(new
> FlinkKafkaConsumer010<>("topicTest", new CustomDeserializer(),
> properties)).rebalance();
>
> stream.print();
>
> If I execute it I get a nullPointerException so I imagine miss something in
> CustomObject class: I have implemented just a toString() method.
>
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-get-keyed-messages-from-Kafka-tp13687p13702.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
>


Mime
View raw message