flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Andres R. Masegosa " <and...@cs.aau.dk>
Subject Re: Bug broadcasting objects (serialization issue)
Date Thu, 03 Sep 2015 07:57:45 GMT
Hi,

I get a new similar bug when broadcasting a list of integers if this
list is made unmodifiable,

        elements = Collections.unmodifiableList(elements);


I include this code to reproduce the result,


public class WordCountExample {
    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();

    DataSet<String> text = env.fromElements(
                "Who's there?",
                "I think I hear them. Stand, ho! Who's there?");

        List<Integer> elements = new ArrayList<Integer>();
        elements.add(0);

        elements = Collections.unmodifiableList(elements);

        DataSet<TestClass> set = env.fromElements(new TestClass(elements));

        DataSet<Tuple2<String, Integer>> wordCounts = text
                .flatMap(new LineSplitter())
                .withBroadcastSet(set, "set")
                .groupBy(0)
                .sum(1);

        wordCounts.print();
    }

    public static class LineSplitter implements FlatMapFunction<String,
Tuple2<String, Integer>> {
        @Override
        public void flatMap(String line, Collector<Tuple2<String,
Integer>> out) {
            for (String word : line.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

    public static class TestClass implements Serializable {
        private static final long serialVersionUID = -2932037991574118651L;

        List<Integer> integerList;
        public TestClass(List<Integer> integerList){
            this.integerList=integerList;
        }


    }
}

Thanks for your support,
Andres

On 2/9/15 11:17, Andres R. Masegosa  wrote:
> Hi,
> 
> I get a bug when trying to broadcast a list of integers created with the
> primitive "Arrays.asList(...)".
> 
> For example, if you try to run this "wordcount" example, you can
> reproduce the bug.
> 
> 
> public class WordCountExample {
>     public static void main(String[] args) throws Exception {
>         final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> 
>     DataSet<String> text = env.fromElements(
>                 "Who's there?",
>                 "I think I hear them. Stand, ho! Who's there?");
> 
>         List<Integer> elements = Arrays.asList(0, 0, 0);
> 
>         DataSet<TestClass> set = env.fromElements(new TestClass(elements));
> 
>         DataSet<Tuple2<String, Integer>> wordCounts = text
>                 .flatMap(new LineSplitter())
>                 .withBroadcastSet(set, "set")
>                 .groupBy(0)
>                 .sum(1);
> 
>         wordCounts.print();
>     }
> 
>     public static class LineSplitter implements FlatMapFunction<String,
> Tuple2<String, Integer>> {
>         @Override
>         public void flatMap(String line, Collector<Tuple2<String,
> Integer>> out) {
>             for (String word : line.split(" ")) {
>                 out.collect(new Tuple2<String, Integer>(word, 1));
>             }
>         }
>     }
> 
>     public static class TestClass implements Serializable {
>         private static final long serialVersionUID = -2932037991574118651L;
> 
>         List<Integer> integerList;
>         public TestClass(List<Integer> integerList){
>             this.integerList=integerList;
>         }
> 
> 
>     }
> }
> 
> 
> However, if instead of using the primitive "Arrays.asList(...)", we use
> instead the ArrayList<> constructor, there is any problem!!!!
> 
> 
> Regards,
> Andres
> 

Mime
View raw message