flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yukun Guo <gyk....@gmail.com>
Subject Serialization problem for Guava's TreeMultimap
Date Sun, 18 Sep 2016 10:53:53 GMT
Here is the code snippet:

windowedStream.fold(TreeMultimap.<Long, String>create(), new
FoldFunction<Tuple2<String, Long>, TreeMultimap<Long, String>>() {
    public TreeMultimap<Long, String> fold(TreeMultimap<Long, String> topKSoFar,
                                           Tuple2<String, Long>
itemCount) throws Exception {
        String item = itemCount.f0;
        Long count = itemCount.f1;
        topKSoFar.put(count, item);
        if (topKSoFar.keySet().size() > topK) {
        return topKSoFar;

The problem is when fold function getting called, the initial value has
lost therefore it encounters a NullPointerException. This is due to failed
type extraction and serialization, as shown in the log message:
"INFO  TypeExtractor:1685 - No fields detected for class
com.google.common.collect.TreeMultimap. Cannot be used as a PojoType. Will
be handled as GenericType."

I have tried the following two ways to fix it but neither of them worked:

1. Writing a class TreeMultimapSerializer which extends Kryo's
Serializer<T>, and calling
`env.addDefaultKryoSerializer(TreeMultimap.class, new
TreeMultimapSerializer()`. The write/read methods are almost line-by-line
translations from TreeMultimap's own implementation.

2. TreeMultimap has implemented Serializable interface so Kryo can fall
back to use the standard Java serialization. Since Kryo's JavaSerializer
itself is not serializable, I wrote an adapter to make it fit the
"addDefaultKryoSerializer" API.

Could you please give me some working examples for custom Kryo
serialization in Flink?

Best regards,

View raw message