Hello all,

I am trying to replicate the code in the Docs (https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/batch/dataset_transformations.html#combinable-groupreducefunctions)

But I keep getting the following exception:

The Iterable can be iterated over only once. Only the first call to 'iterator()' will succeed.

This is what I have:

class MyCombinableGroupReducer
    extends GroupReduceFunction[(Double, Double), Double]
    with GroupCombineFunction[(Double, Double), (Double, Double)] {
    import collection.JavaConverters._
    override def reduce(
      in: java.lang.Iterable[(Double, Double)],
      out: Collector[Double]): Unit =
      {
        val r = in.asScala.reduce ( (a, b) =>  ///ERROR HAPPENS HERE
          (a._1, a._2 + b._2)
        )
        out.collect(r._1 + r._2)
      }

    override def combine(
      in: lang.Iterable[(Double, Double)],
      out: Collector[(Double, Double)]): Unit = {
      ???
    }
  }

Where am I transversing `in` a second time? may be is the call to `asScala`?

Bests

-- Alejandro Alcalde - elbauldelprogramador.com