flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [1/2] flink git commit: [FLINK-3664] Create DataSetUtils method to easily summarize a DataSet of Tuples
Date Mon, 18 Apr 2016 14:47:51 GMT
Repository: flink
Updated Branches:
  refs/heads/master fabc5f96e -> 7eb58773e


http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/CompensatedSumTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/CompensatedSumTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/CompensatedSumTest.java
new file mode 100644
index 0000000..5036123
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/CompensatedSumTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.summarize.aggregation;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CompensatedSumTest {
+
+	/**
+	 * When adding a series of numbers the order of the numbers should not impact the results.
+	 *
+	 * This test shows that a naive summation comes up with a different result than Kahan
+	 * Summation when you start with either a smaller or larger number in some cases and
+	 * helps prove our Kahan Summation is working.
+	 */
+	@Test
+	public void testAdd1() throws Exception {
+		final CompensatedSum smallSum = new CompensatedSum(0.001, 0.0);
+		final CompensatedSum largeSum = new CompensatedSum(1000, 0.0);
+
+		CompensatedSum compensatedResult1 = smallSum;
+		CompensatedSum compensatedResult2 = largeSum;
+		double naiveResult1 = smallSum.value();
+		double naiveResult2 = largeSum.value();
+
+		for(int i = 0; i < 10; i++) {
+			compensatedResult1 = compensatedResult1.add(smallSum);
+			compensatedResult2 = compensatedResult2.add(smallSum);
+			naiveResult1 += smallSum.value();
+			naiveResult2 += smallSum.value();
+		}
+
+		compensatedResult1 = compensatedResult1.add(largeSum);
+		compensatedResult2 = compensatedResult2.add(smallSum);
+		naiveResult1 += largeSum.value();
+		naiveResult2 += smallSum.value();
+
+		// Kahan summation gave the same result no matter what order we added
+		Assert.assertEquals(1000.011, compensatedResult1.value(), 0.0);
+		Assert.assertEquals(1000.011, compensatedResult2.value(), 0.0);
+
+		// naive addition gave a small floating point error
+		Assert.assertEquals(1000.011, naiveResult1, 0.0);
+		Assert.assertEquals(1000.0109999999997, naiveResult2, 0.0);
+
+		Assert.assertEquals(compensatedResult1.value(), compensatedResult2.value(), 0.0);
+		Assert.assertEquals(naiveResult1, naiveResult2, 0.0001);
+		Assert.assertNotEquals(naiveResult1, naiveResult2, 0.0);
+	}
+
+	@Test
+	public void testDelta() throws Exception {
+		CompensatedSum compensatedResult1 = new CompensatedSum(0.001, 0.0);
+		for(int i = 0; i < 10; i++) {
+			compensatedResult1 = compensatedResult1.add(0.001);
+		}
+		Assert.assertEquals(0.011, compensatedResult1.value(), 0.0);
+		Assert.assertEquals(new Double("8.673617379884035E-19"), compensatedResult1.delta(), 0.0);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/DoubleSummaryAggregatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/DoubleSummaryAggregatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/DoubleSummaryAggregatorTest.java
new file mode 100644
index 0000000..08fbe78
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/DoubleSummaryAggregatorTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.summarize.aggregation;
+
+import org.apache.flink.api.java.summarize.NumericColumnSummary;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class DoubleSummaryAggregatorTest {
+
+	/**
+	 * Use some values from Anscombe's Quartet for testing.
+	 *
+	 * There was no particular reason to use these except they have known means and variance.
+	 *
+	 * https://en.wikipedia.org/wiki/Anscombe%27s_quartet
+	 */
+	@Test
+	public void testAnscomesQuartetXValues() throws Exception {
+
+		final Double[] q1x = { 10.0, 8.0, 13.0, 9.0, 11.0, 14.0, 6.0, 4.0, 12.0, 7.0, 5.0 };
+		final Double[] q4x = { 8.0, 8.0, 8.0, 8.0, 8.0, 8.0, 8.0, 19.0, 8.0, 8.0, 8.0 };
+
+		NumericColumnSummary<Double> q1 = summarize(q1x);
+		NumericColumnSummary<Double> q4 = summarize(q4x);
+
+		Assert.assertEquals(9.0, q1.getMean().doubleValue(), 0.0);
+		Assert.assertEquals(9.0, q4.getMean().doubleValue(), 0.0);
+
+		Assert.assertEquals(11.0, q1.getVariance().doubleValue(), 1e-10d);
+		Assert.assertEquals(11.0, q4.getVariance().doubleValue(), 1e-10d);
+
+		double stddev = Math.sqrt(11.0);
+		Assert.assertEquals(stddev, q1.getStandardDeviation().doubleValue(), 1e-10d);
+		Assert.assertEquals(stddev, q4.getStandardDeviation().doubleValue(), 1e-10d);
+	}
+
+	/**
+	 * Use some values from Anscombe's Quartet for testing.
+	 *
+	 * There was no particular reason to use these except they have known means and variance.
+	 *
+	 * https://en.wikipedia.org/wiki/Anscombe%27s_quartet
+	 */
+	@Test
+	public void testAnscomesQuartetYValues() throws Exception {
+		final Double[] q1y = { 8.04, 6.95, 7.58, 8.81, 8.33, 9.96, 7.24, 4.26, 10.84, 4.82, 5.68 };
+		final Double[] q2y = { 9.14, 8.14, 8.74, 8.77, 9.26, 8.1, 6.13, 3.1, 9.13, 7.26, 4.74 };
+		final Double[] q3y = { 7.46, 6.77, 12.74, 7.11, 7.81, 8.84, 6.08, 5.39, 8.15, 6.42, 5.73 };
+		final Double[] q4y = { 6.58, 5.76, 7.71, 8.84, 8.47, 7.04, 5.25, 12.5, 5.56, 7.91, 6.89 };
+
+		NumericColumnSummary<Double> q1 = summarize(q1y);
+		NumericColumnSummary<Double> q2 = summarize(q2y);
+		NumericColumnSummary<Double> q3 = summarize(q3y);
+		NumericColumnSummary<Double> q4 = summarize(q4y);
+
+		// the y values are have less precisely matching means and variances
+
+		Assert.assertEquals(7.5, q1.getMean().doubleValue(), 0.001);
+		Assert.assertEquals(7.5, q2.getMean().doubleValue(), 0.001);
+		Assert.assertEquals(7.5, q3.getMean().doubleValue(), 0.001);
+		Assert.assertEquals(7.5, q4.getMean().doubleValue(), 0.001);
+
+		Assert.assertEquals(4.12, q1.getVariance().doubleValue(), 0.01);
+		Assert.assertEquals(4.12, q2.getVariance().doubleValue(), 0.01);
+		Assert.assertEquals(4.12, q3.getVariance().doubleValue(), 0.01);
+		Assert.assertEquals(4.12, q4.getVariance().doubleValue(), 0.01);
+	}
+
+	@Test
+	public void testIsNan() throws Exception {
+		DoubleSummaryAggregator ag = new DoubleSummaryAggregator();
+		Assert.assertFalse(ag.isNan(-1.0));
+		Assert.assertFalse(ag.isNan(0.0));
+		Assert.assertFalse(ag.isNan(23.0));
+		Assert.assertFalse(ag.isNan(Double.MAX_VALUE));
+		Assert.assertFalse(ag.isNan(Double.MIN_VALUE));
+		Assert.assertTrue(ag.isNan(Double.NaN));
+	}
+
+	@Test
+	public void testIsInfinite() throws Exception {
+		DoubleSummaryAggregator ag = new DoubleSummaryAggregator();
+		Assert.assertFalse(ag.isInfinite(-1.0));
+		Assert.assertFalse(ag.isInfinite(0.0));
+		Assert.assertFalse(ag.isInfinite(23.0));
+		Assert.assertFalse(ag.isInfinite(Double.MAX_VALUE));
+		Assert.assertFalse(ag.isInfinite(Double.MIN_VALUE));
+		Assert.assertTrue(ag.isInfinite(Double.POSITIVE_INFINITY));
+		Assert.assertTrue(ag.isInfinite(Double.NEGATIVE_INFINITY));
+	}
+
+	@Test
+	public void testMean() throws Exception {
+		Assert.assertEquals(50.0, summarize(0.0, 100.0).getMean(), 0.0);
+		Assert.assertEquals(33.333333, summarize(0.0, 0.0, 100.0).getMean(), 0.00001);
+		Assert.assertEquals(50.0, summarize(0.0, 0.0, 100.0, 100.0).getMean(), 0.0);
+		Assert.assertEquals(50.0, summarize(0.0, 100.0, null).getMean(), 0.0);
+		Assert.assertNull(summarize().getMean());
+	}
+
+	@Test
+	public void testSum() throws Exception {
+		Assert.assertEquals(100.0, summarize(0.0, 100.0).getSum().doubleValue(), 0.0);
+		Assert.assertEquals(15, summarize(1.0, 2.0, 3.0, 4.0, 5.0).getSum().doubleValue(), 0.0);
+		Assert.assertEquals(0, summarize(-100.0, 0.0, 100.0, null).getSum().doubleValue(), 0.0);
+		Assert.assertEquals(90, summarize(-10.0, 100.0, null).getSum().doubleValue(), 0.0);
+		Assert.assertNull(summarize().getSum());
+	}
+
+	@Test
+	public void testMax() throws Exception {
+		Assert.assertEquals(1001.0, summarize(-1000.0, 0.0, 1.0, 50.0, 999.0, 1001.0).getMax().doubleValue(), 0.0);
+		Assert.assertEquals(11.0, summarize(1.0, 8.0, 7.0, 6.0, 9.0, 10.0, 2.0, 3.0, 5.0, 0.0, 11.0, -2.0, 3.0).getMax().doubleValue(), 0.0);
+		Assert.assertEquals(11.0, summarize(1.0, 8.0, 7.0, 6.0, 9.0, null, 10.0, 2.0, 3.0, 5.0, null, 0.0, 11.0, -2.0, 3.0).getMax().doubleValue(), 0.0);
+		Assert.assertNull(summarize().getMax());
+	}
+
+	@Test
+	public void testMin() throws Exception {
+		Assert.assertEquals(-1000, summarize(-1000.0, 0.0, 1.0, 50.0, 999.0, 1001.0).getMin().doubleValue(), 0.0);
+		Assert.assertEquals(-2.0, summarize(1.0, 8.0, 7.0, 6.0, 9.0, 10.0, 2.0, 3.0, 5.0, 0.0, 11.0, -2.0, 3.0).getMin().doubleValue(), 0.0);
+		Assert.assertEquals(-2.0, summarize(1.0, 8.0, 7.0, 6.0, 9.0, null, 10.0, 2.0, 3.0, 5.0, null, 0.0, 11.0, -2.0, 3.0).getMin().doubleValue(), 0.0);
+		Assert.assertNull(summarize().getMin());
+	}
+
+	@Test
+	public void testCounts() throws Exception {
+		NumericColumnSummary<Double> summary = summarize(Double.NaN, 1.0, null, 123.0, -44.00001, Double.POSITIVE_INFINITY, 55.0, Double.NEGATIVE_INFINITY, Double.NEGATIVE_INFINITY, null, Double.NaN);
+		Assert.assertEquals(11, summary.getTotalCount());
+		Assert.assertEquals(2, summary.getNullCount());
+		Assert.assertEquals(9, summary.getNonNullCount());
+		Assert.assertEquals(7, summary.getMissingCount());
+		Assert.assertEquals(4, summary.getNonMissingCount());
+		Assert.assertEquals(2, summary.getNanCount());
+		Assert.assertEquals(3, summary.getInfinityCount());
+	}
+
+	/**
+	 * Helper method for summarizing a list of values.
+	 *
+	 * This method breaks the rule of "testing only one thing" by aggregating and combining
+	 * a bunch of different ways.
+	 */
+	protected NumericColumnSummary<Double> summarize(Double... values) {
+		return new AggregateCombineHarness<Double,NumericColumnSummary<Double>,DoubleSummaryAggregator>() {
+
+			@Override
+			protected void compareResults(NumericColumnSummary<Double> result1, NumericColumnSummary<Double> result2) {
+				Assert.assertEquals(result1.getMin(), result2.getMin(), 0.0);
+				Assert.assertEquals(result1.getMax(), result2.getMax(), 0.0);
+				Assert.assertEquals(result1.getMean(), result2.getMean(), 1e-12d);
+				Assert.assertEquals(result1.getVariance(), result2.getVariance(), 1e-9d);
+				Assert.assertEquals(result1.getStandardDeviation(), result2.getStandardDeviation(), 1e-12d);
+			}
+
+		}.summarize(values);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/DoubleValueSummaryAggregatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/DoubleValueSummaryAggregatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/DoubleValueSummaryAggregatorTest.java
new file mode 100644
index 0000000..a30d6aa
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/DoubleValueSummaryAggregatorTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.summarize.aggregation;
+
+import org.apache.flink.api.java.summarize.NumericColumnSummary;
+import org.apache.flink.types.DoubleValue;
+import org.junit.Assert;
+
+public class DoubleValueSummaryAggregatorTest extends DoubleSummaryAggregatorTest {
+
+	/**
+	 * Helper method for summarizing a list of values.
+	 *
+	 * This method breaks the rule of "testing only one thing" by aggregating and combining
+	 * a bunch of different ways.
+	 */
+	protected NumericColumnSummary<Double> summarize(Double... values) {
+
+		DoubleValue[] doubleValues = new DoubleValue[values.length];
+		for(int i = 0; i < values.length; i++) {
+			if (values[i] != null) {
+				doubleValues[i] = new DoubleValue(values[i]);
+			}
+		}
+
+		return new AggregateCombineHarness<DoubleValue,NumericColumnSummary<Double>,ValueSummaryAggregator.DoubleValueSummaryAggregator>() {
+
+			@Override
+			protected void compareResults(NumericColumnSummary<Double> result1, NumericColumnSummary<Double> result2) {
+				Assert.assertEquals(result1.getMin(), result2.getMin(), 0.0);
+				Assert.assertEquals(result1.getMax(), result2.getMax(), 0.0);
+				Assert.assertEquals(result1.getMean(), result2.getMean(), 1e-12d);
+				Assert.assertEquals(result1.getVariance(), result2.getVariance(), 1e-9d);
+				Assert.assertEquals(result1.getStandardDeviation(), result2.getStandardDeviation(), 1e-12d);
+			}
+
+		}.summarize(doubleValues);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/FloatSummaryAggregatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/FloatSummaryAggregatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/FloatSummaryAggregatorTest.java
new file mode 100644
index 0000000..c761fc2
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/FloatSummaryAggregatorTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0f (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0f
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.summarize.aggregation;
+
+import org.apache.flink.api.java.summarize.NumericColumnSummary;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class FloatSummaryAggregatorTest {
+
+	/**
+	 * Use some values from Anscombe's Quartet for testing.
+	 *
+	 * There was no particular reason to use these except they have known means and variance.
+	 *
+	 * https://en.wikipedia.org/wiki/Anscombe%27s_quartet
+	 */
+	@Test
+	public void testAnscomesQuartetXValues() throws Exception {
+
+		final Float[] q1x = { 10.0f, 8.0f, 13.0f, 9.0f, 11.0f, 14.0f, 6.0f, 4.0f, 12.0f, 7.0f, 5.0f };
+		final Float[] q4x = { 8.0f, 8.0f, 8.0f, 8.0f, 8.0f, 8.0f, 8.0f, 19.0f, 8.0f, 8.0f, 8.0f };
+
+		NumericColumnSummary<Float> q1 = summarize(q1x);
+		NumericColumnSummary<Float> q4 = summarize(q4x);
+
+		Assert.assertEquals(9.0, q1.getMean().doubleValue(), 0.0f);
+		Assert.assertEquals(9.0, q4.getMean().doubleValue(), 0.0f);
+
+		Assert.assertEquals(11.0, q1.getVariance().doubleValue(), 1e-10d);
+		Assert.assertEquals(11.0, q4.getVariance().doubleValue(), 1e-10d);
+
+		double stddev = Math.sqrt(11.0f);
+		Assert.assertEquals(stddev, q1.getStandardDeviation().doubleValue(), 1e-10d);
+		Assert.assertEquals(stddev, q4.getStandardDeviation().doubleValue(), 1e-10d);
+	}
+
+	/**
+	 * Use some values from Anscombe's Quartet for testing.
+	 *
+	 * There was no particular reason to use these except they have known means and variance.
+	 *
+	 * https://en.wikipedia.org/wiki/Anscombe%27s_quartet
+	 */
+	@Test
+	public void testAnscomesQuartetYValues() throws Exception {
+		final Float[] q1y = { 8.04f, 6.95f, 7.58f, 8.81f, 8.33f, 9.96f, 7.24f, 4.26f, 10.84f, 4.82f, 5.68f };
+		final Float[] q2y = { 9.14f, 8.14f, 8.74f, 8.77f, 9.26f, 8.1f, 6.13f, 3.1f, 9.13f, 7.26f, 4.74f };
+		final Float[] q3y = { 7.46f, 6.77f, 12.74f, 7.11f, 7.81f, 8.84f, 6.08f, 5.39f, 8.15f, 6.42f, 5.73f };
+		final Float[] q4y = { 6.58f, 5.76f, 7.71f, 8.84f, 8.47f, 7.04f, 5.25f, 12.5f, 5.56f, 7.91f, 6.89f };
+
+		NumericColumnSummary<Float> q1 = summarize(q1y);
+		NumericColumnSummary<Float> q2 = summarize(q2y);
+		NumericColumnSummary<Float> q3 = summarize(q3y);
+		NumericColumnSummary<Float> q4 = summarize(q4y);
+
+		// the y values are have less precisely matching means and variances
+
+		Assert.assertEquals(7.5, q1.getMean().doubleValue(), 0.001);
+		Assert.assertEquals(7.5, q2.getMean().doubleValue(), 0.001);
+		Assert.assertEquals(7.5, q3.getMean().doubleValue(), 0.001);
+		Assert.assertEquals(7.5, q4.getMean().doubleValue(), 0.001);
+
+		Assert.assertEquals(4.12, q1.getVariance().doubleValue(), 0.01);
+		Assert.assertEquals(4.12, q2.getVariance().doubleValue(), 0.01);
+		Assert.assertEquals(4.12, q3.getVariance().doubleValue(), 0.01);
+		Assert.assertEquals(4.12, q4.getVariance().doubleValue(), 0.01);
+	}
+
+	@Test
+	public void testIsNan() throws Exception {
+		FloatSummaryAggregator ag = new FloatSummaryAggregator();
+		Assert.assertFalse(ag.isNan(-1.0f));
+		Assert.assertFalse(ag.isNan(0.0f));
+		Assert.assertFalse(ag.isNan(23.0f));
+		Assert.assertFalse(ag.isNan(Float.MAX_VALUE));
+		Assert.assertFalse(ag.isNan(Float.MIN_VALUE));
+		Assert.assertTrue(ag.isNan(Float.NaN));
+	}
+
+	@Test
+	public void testIsInfinite() throws Exception {
+		FloatSummaryAggregator ag = new FloatSummaryAggregator();
+		Assert.assertFalse(ag.isInfinite(-1.0f));
+		Assert.assertFalse(ag.isInfinite(0.0f));
+		Assert.assertFalse(ag.isInfinite(23.0f));
+		Assert.assertFalse(ag.isInfinite(Float.MAX_VALUE));
+		Assert.assertFalse(ag.isInfinite(Float.MIN_VALUE));
+		Assert.assertTrue(ag.isInfinite(Float.POSITIVE_INFINITY));
+		Assert.assertTrue(ag.isInfinite(Float.NEGATIVE_INFINITY));
+	}
+
+	@Test
+	public void testMean() throws Exception {
+		Assert.assertEquals(50.0, summarize(0.0f, 100.0f).getMean(), 0.0);
+		Assert.assertEquals(33.333333, summarize(0.0f, 0.0f, 100.0f).getMean(), 0.00001);
+		Assert.assertEquals(50.0, summarize(0.0f, 0.0f, 100.0f, 100.0f).getMean(), 0.0);
+		Assert.assertEquals(50.0, summarize(0.0f, 100.0f, null).getMean(), 0.0);
+		Assert.assertNull(summarize().getMean());
+	}
+
+	@Test
+	public void testSum() throws Exception {
+		Assert.assertEquals(100.0, summarize(0.0f, 100.0f).getSum().floatValue(), 0.0f);
+		Assert.assertEquals(15, summarize(1.0f, 2.0f, 3.0f, 4.0f, 5.0f).getSum().floatValue(), 0.0f);
+		Assert.assertEquals(0, summarize(-100.0f, 0.0f, 100.0f, null).getSum().floatValue(), 0.0f);
+		Assert.assertEquals(90, summarize(-10.0f, 100.0f, null).getSum().floatValue(), 0.0f);
+		Assert.assertNull(summarize().getSum());
+	}
+
+	@Test
+	public void testMax() throws Exception {
+		Assert.assertEquals(1001.0f, summarize(-1000.0f, 0.0f, 1.0f, 50.0f, 999.0f, 1001.0f).getMax().floatValue(), 0.0f);
+		Assert.assertEquals(11.0f, summarize(1.0f, 8.0f, 7.0f, 6.0f, 9.0f, 10.0f, 2.0f, 3.0f, 5.0f, 0.0f, 11.0f, -2.0f, 3.0f).getMax().floatValue(), 0.0f);
+		Assert.assertEquals(11.0f, summarize(1.0f, 8.0f, 7.0f, 6.0f, 9.0f, null, 10.0f, 2.0f, 3.0f, 5.0f, null, 0.0f, 11.0f, -2.0f, 3.0f).getMax().floatValue(), 0.0f);
+		Assert.assertNull(summarize().getMax());
+	}
+
+	@Test
+	public void testMin() throws Exception {
+		Assert.assertEquals(-1000, summarize(-1000.0f, 0.0f, 1.0f, 50.0f, 999.0f, 1001.0f).getMin().floatValue(), 0.0f);
+		Assert.assertEquals(-2.0f, summarize(1.0f, 8.0f, 7.0f, 6.0f, 9.0f, 10.0f, 2.0f, 3.0f, 5.0f, 0.0f, 11.0f, -2.0f, 3.0f).getMin().floatValue(), 0.0f);
+		Assert.assertEquals(-2.0f, summarize(1.0f, 8.0f, 7.0f, 6.0f, 9.0f, null, 10.0f, 2.0f, 3.0f, 5.0f, null, 0.0f, 11.0f, -2.0f, 3.0f).getMin().floatValue(), 0.0f);
+		Assert.assertNull(summarize().getMin());
+	}
+
+	/**
+	 * Helper method for summarizing a list of values.
+	 *
+	 * This method breaks the rule of "testing only one thing" by aggregating
+	 * and combining a bunch of different ways.
+	 */
+	protected NumericColumnSummary<Float> summarize(Float... values) {
+
+		return new AggregateCombineHarness<Float,NumericColumnSummary<Float>,FloatSummaryAggregator>() {
+
+			@Override
+			protected void compareResults(NumericColumnSummary<Float> result1, NumericColumnSummary<Float> result2) {
+				Assert.assertEquals(result1.getMin(), result2.getMin(), 0.0f);
+				Assert.assertEquals(result1.getMax(), result2.getMax(), 0.0f);
+				Assert.assertEquals(result1.getMean(), result2.getMean(), 1e-12d);
+				Assert.assertEquals(result1.getVariance(), result2.getVariance(), 1e-9d);
+				Assert.assertEquals(result1.getStandardDeviation(), result2.getStandardDeviation(), 1e-12d);
+			}
+
+		}.summarize(values);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/FloatValueSummaryAggregatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/FloatValueSummaryAggregatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/FloatValueSummaryAggregatorTest.java
new file mode 100644
index 0000000..ff87946
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/FloatValueSummaryAggregatorTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.summarize.aggregation;
+
+import org.apache.flink.api.java.summarize.NumericColumnSummary;
+import org.apache.flink.types.FloatValue;
+import org.junit.Assert;
+
+public class FloatValueSummaryAggregatorTest extends FloatSummaryAggregatorTest {
+
+	/**
+	 * Helper method for summarizing a list of values.
+	 *
+	 * This method breaks the rule of "testing only one thing" by aggregating
+	 * and combining a bunch of different ways.
+	 */
+	@Override
+	protected NumericColumnSummary<Float> summarize(Float... values) {
+
+		FloatValue[] floatValues = new FloatValue[values.length];
+		for(int i = 0; i < values.length; i++) {
+			if (values[i] != null) {
+				floatValues[i] = new FloatValue(values[i]);
+			}
+		}
+
+		return new AggregateCombineHarness<FloatValue,NumericColumnSummary<Float>,ValueSummaryAggregator.FloatValueSummaryAggregator>() {
+
+			@Override
+			protected void compareResults(NumericColumnSummary<Float> result1, NumericColumnSummary<Float> result2) {
+				Assert.assertEquals(result1.getMin(), result2.getMin(), 0.0f);
+				Assert.assertEquals(result1.getMax(), result2.getMax(), 0.0f);
+				Assert.assertEquals(result1.getMean(), result2.getMean(), 1e-10d);
+				Assert.assertEquals(result1.getVariance(), result2.getVariance(), 1e-9d);
+				Assert.assertEquals(result1.getStandardDeviation(), result2.getStandardDeviation(), 1e-10d);
+			}
+
+		}.summarize(floatValues);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/IntegerSummaryAggregatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/IntegerSummaryAggregatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/IntegerSummaryAggregatorTest.java
new file mode 100644
index 0000000..110d2cc
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/IntegerSummaryAggregatorTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.summarize.aggregation;
+
+import org.apache.flink.api.java.summarize.NumericColumnSummary;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class IntegerSummaryAggregatorTest {
+
+	@Test
+	public void testIsNan() throws Exception {
+		IntegerSummaryAggregator ag = new IntegerSummaryAggregator();
+		// always false for Integer
+		Assert.assertFalse(ag.isNan(-1));
+		Assert.assertFalse(ag.isNan(0));
+		Assert.assertFalse(ag.isNan(23));
+		Assert.assertFalse(ag.isNan(Integer.MAX_VALUE));
+		Assert.assertFalse(ag.isNan(Integer.MIN_VALUE));
+		Assert.assertFalse(ag.isNan(null));
+	}
+
+	@Test
+	public void testIsInfinite() throws Exception {
+		IntegerSummaryAggregator ag = new IntegerSummaryAggregator();
+		// always false for Integer
+		Assert.assertFalse(ag.isInfinite(-1));
+		Assert.assertFalse(ag.isInfinite(0));
+		Assert.assertFalse(ag.isInfinite(23));
+		Assert.assertFalse(ag.isInfinite(Integer.MAX_VALUE));
+		Assert.assertFalse(ag.isInfinite(Integer.MIN_VALUE));
+		Assert.assertFalse(ag.isInfinite(null));
+	}
+
+	@Test
+	public void testMean() throws Exception {
+		Assert.assertEquals(50.0, summarize(0, 100).getMean(), 0.0);
+		Assert.assertEquals(33.333333, summarize(0, 0, 100).getMean(), 0.00001);
+		Assert.assertEquals(50.0, summarize(0, 0, 100, 100).getMean(), 0.0);
+		Assert.assertEquals(50.0, summarize(0, 100, null).getMean(), 0.0);
+		Assert.assertNull(summarize().getMean());
+	}
+
+	@Test
+	public void testSum() throws Exception {
+		Assert.assertEquals(100, summarize(0, 100).getSum().intValue());
+		Assert.assertEquals(15, summarize(1, 2, 3, 4, 5).getSum().intValue());
+		Assert.assertEquals(0, summarize(-100, 0, 100, null).getSum().intValue());
+		Assert.assertEquals(90, summarize(-10, 100, null).getSum().intValue());
+		Assert.assertNull(summarize().getSum());
+	}
+
+	@Test
+	public void testMax() throws Exception {
+		Assert.assertEquals(1001, summarize(-1000, 0, 1, 50, 999, 1001).getMax().intValue());
+		Assert.assertEquals(0, summarize(Integer.MIN_VALUE, -1000, 0).getMax().intValue());
+		Assert.assertEquals(11, summarize(1, 8, 7, 6, 9, 10, 2, 3, 5, 0, 11, -2, 3).getMax().intValue());
+		Assert.assertEquals(11, summarize(1, 8, 7, 6, 9, null, 10, 2, 3, 5, null, 0, 11, -2, 3).getMax().intValue());
+		Assert.assertNull(summarize().getMax());
+	}
+
+	@Test
+	public void testMin() throws Exception {
+		Assert.assertEquals(-1000, summarize(-1000, 0, 1, 50, 999, 1001).getMin().intValue());
+		Assert.assertEquals(Integer.MIN_VALUE, summarize(Integer.MIN_VALUE, -1000, 0).getMin().intValue());
+		Assert.assertEquals(-2, summarize(1, 8, 7, 6, 9, 10, 2, 3, 5, 0, 11, -2, 3).getMin().intValue());
+		Assert.assertEquals(-2, summarize(1, 8, 7, 6, 9, null, 10, 2, 3, 5, null, 0, 11, -2, 3).getMin().intValue());
+		Assert.assertNull(summarize().getMin());
+	}
+
+	/**
+	 * Helper method for summarizing a list of values
+	 */
+	protected NumericColumnSummary<Integer> summarize(Integer... values) {
+
+		return new AggregateCombineHarness<Integer,NumericColumnSummary<Integer>,IntegerSummaryAggregator>() {
+
+			@Override
+			protected void compareResults(NumericColumnSummary<Integer> result1, NumericColumnSummary<Integer> result2) {
+
+				Assert.assertEquals(result1.getTotalCount(),result2.getTotalCount());
+				Assert.assertEquals(result1.getNullCount(),result2.getNullCount());
+				Assert.assertEquals(result1.getMissingCount(),result2.getMissingCount());
+				Assert.assertEquals(result1.getNonMissingCount(),result2.getNonMissingCount());
+				Assert.assertEquals(result1.getInfinityCount(),result2.getInfinityCount());
+				Assert.assertEquals(result1.getNanCount(),result2.getNanCount());
+
+				Assert.assertEquals(result1.containsNull(),result2.containsNull());
+				Assert.assertEquals(result1.containsNonNull(),result2.containsNonNull());
+
+				Assert.assertEquals(result1.getMin().intValue(),result2.getMin().intValue());
+				Assert.assertEquals(result1.getMax().intValue(), result2.getMax().intValue());
+				Assert.assertEquals(result1.getSum().intValue(),result2.getSum().intValue());
+				Assert.assertEquals(result1.getMean().doubleValue(), result2.getMean().doubleValue(), 1e-12d);
+				Assert.assertEquals(result1.getVariance().doubleValue(),result2.getVariance().doubleValue(), 1e-9d);
+				Assert.assertEquals(result1.getStandardDeviation().doubleValue(),result2.getStandardDeviation().doubleValue(), 1e-12d);
+			}
+		}.summarize(values);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/IntegerValueSummaryAggregatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/IntegerValueSummaryAggregatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/IntegerValueSummaryAggregatorTest.java
new file mode 100644
index 0000000..6ac5485
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/IntegerValueSummaryAggregatorTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.summarize.aggregation;
+
+import org.apache.flink.api.java.summarize.NumericColumnSummary;
+import org.apache.flink.types.IntValue;
+import org.junit.Assert;
+
+public class IntegerValueSummaryAggregatorTest extends IntegerSummaryAggregatorTest {
+
+	@Override
+	protected NumericColumnSummary<Integer> summarize(Integer... values) {
+
+		IntValue[] intValues = new IntValue[values.length];
+		for(int i = 0; i < values.length; i++) {
+			if (values[i] != null) {
+				intValues[i] = new IntValue(values[i]);
+			}
+		}
+
+		return new AggregateCombineHarness<IntValue,NumericColumnSummary<Integer>,ValueSummaryAggregator.IntegerValueSummaryAggregator>() {
+
+			@Override
+			protected void compareResults(NumericColumnSummary<Integer> result1, NumericColumnSummary<Integer> result2) {
+
+				Assert.assertEquals(result1.getTotalCount(), result2.getTotalCount());
+				Assert.assertEquals(result1.getNullCount(),result2.getNullCount());
+				Assert.assertEquals(result1.getMissingCount(),result2.getMissingCount());
+				Assert.assertEquals(result1.getNonMissingCount(),result2.getNonMissingCount());
+				Assert.assertEquals(result1.getInfinityCount(),result2.getInfinityCount());
+				Assert.assertEquals(result1.getNanCount(),result2.getNanCount());
+
+				Assert.assertEquals(result1.containsNull(),result2.containsNull());
+				Assert.assertEquals(result1.containsNonNull(),result2.containsNonNull());
+
+				Assert.assertEquals(result1.getMin().intValue(),result2.getMin().intValue());
+				Assert.assertEquals(result1.getMax().intValue(), result2.getMax().intValue());
+				Assert.assertEquals(result1.getSum().intValue(),result2.getSum().intValue());
+				Assert.assertEquals(result1.getMean().doubleValue(), result2.getMean().doubleValue(), 1e-12d);
+				Assert.assertEquals(result1.getVariance().doubleValue(),result2.getVariance().doubleValue(), 1e-9d);
+				Assert.assertEquals(result1.getStandardDeviation().doubleValue(),result2.getStandardDeviation().doubleValue(), 1e-12d);
+			}
+		}.summarize(intValues);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/LongSummaryAggregatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/LongSummaryAggregatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/LongSummaryAggregatorTest.java
new file mode 100644
index 0000000..1905657
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/LongSummaryAggregatorTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.summarize.aggregation;
+
+import org.apache.flink.api.java.summarize.NumericColumnSummary;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class LongSummaryAggregatorTest {
+
+	@Test
+	public void testIsNan() throws Exception {
+		LongSummaryAggregator ag = new LongSummaryAggregator();
+		// always false for Long
+		Assert.assertFalse(ag.isNan(-1L));
+		Assert.assertFalse(ag.isNan(0L));
+		Assert.assertFalse(ag.isNan(23L));
+		Assert.assertFalse(ag.isNan(Long.MAX_VALUE));
+		Assert.assertFalse(ag.isNan(Long.MIN_VALUE));
+		Assert.assertFalse(ag.isNan(null));
+	}
+
+	@Test
+	public void testIsInfinite() throws Exception {
+		LongSummaryAggregator ag = new LongSummaryAggregator();
+		// always false for Long
+		Assert.assertFalse(ag.isInfinite(-1L));
+		Assert.assertFalse(ag.isInfinite(0L));
+		Assert.assertFalse(ag.isInfinite(23L));
+		Assert.assertFalse(ag.isInfinite(Long.MAX_VALUE));
+		Assert.assertFalse(ag.isInfinite(Long.MIN_VALUE));
+		Assert.assertFalse(ag.isInfinite(null));
+	}
+
+	@Test
+	public void testMean() throws Exception {
+		Assert.assertEquals(50.0, summarize(0L, 100L).getMean(), 0.0);
+		Assert.assertEquals(33.333333, summarize(0L, 0L, 100L).getMean(), 0.00001);
+		Assert.assertEquals(50.0, summarize(0L, 0L, 100L, 100L).getMean(), 0.0);
+		Assert.assertEquals(50.0, summarize(0L, 100L, null).getMean(), 0.0);
+		Assert.assertNull(summarize().getMean());
+	}
+
+	@Test
+	public void testSum() throws Exception {
+		Assert.assertEquals(100L, summarize(0L, 100L).getSum().longValue());
+		Assert.assertEquals(15L, summarize(1L, 2L, 3L, 4L, 5L).getSum().longValue());
+		Assert.assertEquals(0L, summarize(-100L, 0L, 100L, null).getSum().longValue());
+		Assert.assertEquals(90L, summarize(-10L, 100L, null).getSum().longValue());
+		Assert.assertNull(summarize().getSum());
+	}
+
+	@Test
+	public void testMax() throws Exception {
+		Assert.assertEquals(1001L, summarize(-1000L, 0L, 1L, 50L, 999L, 1001L).getMax().longValue());
+		Assert.assertEquals(11L, summarize(1L, 8L, 7L, 6L, 9L, 10L, 2L, 3L, 5L, 0L, 11L, -2L, 3L).getMax().longValue());
+		Assert.assertEquals(11L, summarize(1L, 8L, 7L, 6L, 9L, null, 10L, 2L, 3L, 5L, null, 0L, 11L, -2L, 3L).getMax().longValue());
+		Assert.assertNull(summarize().getMax());
+	}
+
+	@Test
+	public void testMin() throws Exception {
+		Assert.assertEquals(-1000L, summarize(-1000L, 0L, 1L, 50L, 999L, 1001L).getMin().longValue());
+		Assert.assertEquals(-2L, summarize(1L, 8L, 7L, 6L, 9L, 10L, 2L, 3L, 5L, 0L, 11L, -2L, 3L).getMin().longValue());
+		Assert.assertEquals(-2L, summarize(1L, 8L, 7L, 6L, 9L, null, 10L, 2L, 3L, 5L, null, 0L, 11L, -2L, 3L).getMin().longValue());
+		Assert.assertNull(summarize().getMin());
+	}
+
+	/**
+	 * Helper method for summarizing a list of values
+	 */
+	protected NumericColumnSummary<Long> summarize(Long... values) {
+		return new AggregateCombineHarness<Long,NumericColumnSummary<Long>,LongSummaryAggregator>() {
+
+			@Override
+			protected void compareResults(NumericColumnSummary<Long> result1, NumericColumnSummary<Long> result2) {
+
+				Assert.assertEquals(result1.getTotalCount(),result2.getTotalCount());
+				Assert.assertEquals(result1.getNullCount(), result2.getNullCount());
+				Assert.assertEquals(result1.getMissingCount(),result2.getMissingCount());
+				Assert.assertEquals(result1.getNonMissingCount(),result2.getNonMissingCount());
+				Assert.assertEquals(result1.getInfinityCount(),result2.getInfinityCount());
+				Assert.assertEquals(result1.getNanCount(),result2.getNanCount());
+
+				Assert.assertEquals(result1.containsNull(), result2.containsNull());
+				Assert.assertEquals(result1.containsNonNull(),result2.containsNonNull());
+
+				Assert.assertEquals(result1.getMin().longValue(),result2.getMin().longValue());
+				Assert.assertEquals(result1.getMax().longValue(), result2.getMax().longValue());
+				Assert.assertEquals(result1.getSum().longValue(),result2.getSum().longValue());
+				Assert.assertEquals(result1.getMean().doubleValue(), result2.getMean().doubleValue(), 1e-12d);
+				Assert.assertEquals(result1.getVariance().doubleValue(),result2.getVariance().doubleValue(), 1e-9d);
+				Assert.assertEquals(result1.getStandardDeviation().doubleValue(),result2.getStandardDeviation().doubleValue(), 1e-12d);
+			}
+		}.summarize(values);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/LongValueSummaryAggregatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/LongValueSummaryAggregatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/LongValueSummaryAggregatorTest.java
new file mode 100644
index 0000000..eecda69
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/LongValueSummaryAggregatorTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.summarize.aggregation;
+
+import org.apache.flink.api.java.summarize.NumericColumnSummary;
+import org.apache.flink.types.LongValue;
+import org.junit.Assert;
+
+public class LongValueSummaryAggregatorTest extends LongSummaryAggregatorTest {
+
+	/**
+	 * Helper method for summarizing a list of values
+	 */
+	@Override
+	protected NumericColumnSummary<Long> summarize(Long... values) {
+
+		LongValue[] longValues = new LongValue[values.length];
+		for(int i = 0; i < values.length; i++) {
+			if (values[i] != null) {
+				longValues[i] = new LongValue(values[i]);
+			}
+		}
+
+		return new AggregateCombineHarness<LongValue,NumericColumnSummary<Long>,ValueSummaryAggregator.LongValueSummaryAggregator>() {
+
+			@Override
+			protected void compareResults(NumericColumnSummary<Long> result1, NumericColumnSummary<Long> result2) {
+
+				Assert.assertEquals(result1.getTotalCount(),result2.getTotalCount());
+				Assert.assertEquals(result1.getNullCount(), result2.getNullCount());
+				Assert.assertEquals(result1.getMissingCount(),result2.getMissingCount());
+				Assert.assertEquals(result1.getNonMissingCount(),result2.getNonMissingCount());
+				Assert.assertEquals(result1.getInfinityCount(),result2.getInfinityCount());
+				Assert.assertEquals(result1.getNanCount(),result2.getNanCount());
+
+				Assert.assertEquals(result1.containsNull(), result2.containsNull());
+				Assert.assertEquals(result1.containsNonNull(),result2.containsNonNull());
+
+				Assert.assertEquals(result1.getMin().longValue(),result2.getMin().longValue());
+				Assert.assertEquals(result1.getMax().longValue(), result2.getMax().longValue());
+				Assert.assertEquals(result1.getSum().longValue(),result2.getSum().longValue());
+				Assert.assertEquals(result1.getMean().doubleValue(), result2.getMean().doubleValue(), 1e-12d);
+				Assert.assertEquals(result1.getVariance().doubleValue(),result2.getVariance().doubleValue(), 1e-9d);
+				Assert.assertEquals(result1.getStandardDeviation().doubleValue(),result2.getStandardDeviation().doubleValue(), 1e-12d);
+			}
+		}.summarize(longValues);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/ShortSummaryAggregatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/ShortSummaryAggregatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/ShortSummaryAggregatorTest.java
new file mode 100644
index 0000000..ebbf627
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/ShortSummaryAggregatorTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.summarize.aggregation;
+
+import org.apache.flink.api.java.summarize.NumericColumnSummary;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class ShortSummaryAggregatorTest {
+
+	@Test
+	public void testIsNan() throws Exception {
+		ShortSummaryAggregator ag = new ShortSummaryAggregator();
+		// always false for Short
+		Assert.assertFalse(ag.isNan((short) -1));
+		Assert.assertFalse(ag.isNan((short) 0));
+		Assert.assertFalse(ag.isNan((short) 23));
+		Assert.assertFalse(ag.isNan(Short.MAX_VALUE));
+		Assert.assertFalse(ag.isNan(Short.MIN_VALUE));
+		Assert.assertFalse(ag.isNan(null));
+	}
+
+	@Test
+	public void testIsInfinite() throws Exception {
+		ShortSummaryAggregator ag = new ShortSummaryAggregator();
+		// always false for Short
+		Assert.assertFalse(ag.isInfinite((short) -1));
+		Assert.assertFalse(ag.isInfinite((short) 0));
+		Assert.assertFalse(ag.isInfinite((short) 23));
+		Assert.assertFalse(ag.isInfinite(Short.MAX_VALUE));
+		Assert.assertFalse(ag.isInfinite(Short.MIN_VALUE));
+		Assert.assertFalse(ag.isInfinite(null));
+	}
+
+	@Test
+	public void testMean() throws Exception {
+		Assert.assertEquals(50.0, summarize(0, 100).getMean(), 0.0);
+		Assert.assertEquals(33.333333, summarize(0, 0, 100).getMean(), 0.00001);
+		Assert.assertEquals(50.0, summarize(0, 0, 100, 100).getMean(), 0.0);
+		Assert.assertEquals(50.0, summarize(0, 100, null).getMean(), 0.0);
+		Assert.assertNull(summarize().getMean());
+	}
+
+	@Test
+	public void testSum() throws Exception {
+		Assert.assertEquals(100, summarize(0, 100).getSum().shortValue());
+		Assert.assertEquals(15, summarize(1, 2, 3, 4, 5).getSum().shortValue());
+		Assert.assertEquals(0, summarize(-100, 0, 100, null).getSum().shortValue());
+		Assert.assertEquals(90, summarize(-10, 100, null).getSum().shortValue());
+		Assert.assertNull(summarize().getSum());
+	}
+
+	@Test
+	public void testMax() throws Exception {
+		Assert.assertEquals(1001, summarize(-1000, 0, 1, 50, 999, 1001).getMax().shortValue());
+		Assert.assertEquals(0, summarize((int)Short.MIN_VALUE, -1000, 0).getMax().shortValue());
+		Assert.assertEquals(11, summarize(1, 8, 7, 6, 9, 10, 2, 3, 5, 0, 11, -2, 3).getMax().shortValue());
+		Assert.assertEquals(11, summarize(1, 8, 7, 6, 9, null, 10, 2, 3, 5, null, 0, 11, -2, 3).getMax().shortValue());
+		Assert.assertNull(summarize().getMax());
+	}
+
+	@Test
+	public void testMin() throws Exception {
+		Assert.assertEquals(-1000, summarize(-1000, 0, 1, 50, 999, 1001).getMin().shortValue());
+		Assert.assertEquals(Short.MIN_VALUE, summarize((int)Short.MIN_VALUE, -1000, 0).getMin().shortValue());
+		Assert.assertEquals(-2, summarize(1, 8, 7, 6, 9, 10, 2, 3, 5, 0, 11, -2, 3).getMin().shortValue());
+		Assert.assertEquals(-2, summarize(1, 8, 7, 6, 9, null, 10, 2, 3, 5, null, 0, 11, -2, 3).getMin().shortValue());
+		Assert.assertNull(summarize().getMin());
+	}
+
+	/**
+	 * Helper method for summarizing a list of values
+	 */
+	protected NumericColumnSummary<Short> summarize(Integer... values) {
+
+		// cast everything to short here
+		Short[] shortValues = new Short[values.length];
+		for(int i = 0; i < values.length; i++) {
+			if (values[i] != null) {
+				shortValues[i] = values[i].shortValue();
+			}
+		}
+
+		return new AggregateCombineHarness<Short,NumericColumnSummary<Short>,ShortSummaryAggregator>() {
+
+			@Override
+			protected void compareResults(NumericColumnSummary<Short> result1, NumericColumnSummary<Short> result2) {
+
+				Assert.assertEquals(result1.getTotalCount(),result2.getTotalCount());
+				Assert.assertEquals(result1.getNullCount(),result2.getNullCount());
+				Assert.assertEquals(result1.getMissingCount(),result2.getMissingCount());
+				Assert.assertEquals(result1.getNonMissingCount(),result2.getNonMissingCount());
+				Assert.assertEquals(result1.getInfinityCount(),result2.getInfinityCount());
+				Assert.assertEquals(result1.getNanCount(),result2.getNanCount());
+
+				Assert.assertEquals(result1.containsNull(),result2.containsNull());
+				Assert.assertEquals(result1.containsNonNull(),result2.containsNonNull());
+
+				Assert.assertEquals(result1.getMin().shortValue(),result2.getMin().shortValue());
+				Assert.assertEquals(result1.getMax().shortValue(), result2.getMax().shortValue());
+				Assert.assertEquals(result1.getSum().shortValue(),result2.getSum().shortValue());
+				Assert.assertEquals(result1.getMean().doubleValue(), result2.getMean().doubleValue(), 1e-12d);
+				Assert.assertEquals(result1.getVariance().doubleValue(),result2.getVariance().doubleValue(), 1e-9d);
+				Assert.assertEquals(result1.getStandardDeviation().doubleValue(),result2.getStandardDeviation().doubleValue(), 1e-12d);
+			}
+		}.summarize(shortValues);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/ShortValueSummaryAggregatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/ShortValueSummaryAggregatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/ShortValueSummaryAggregatorTest.java
new file mode 100644
index 0000000..8a8e7aa
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/ShortValueSummaryAggregatorTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.summarize.aggregation;
+
+import org.apache.flink.api.java.summarize.NumericColumnSummary;
+import org.apache.flink.types.ShortValue;
+import org.junit.Assert;
+
+public class ShortValueSummaryAggregatorTest extends ShortSummaryAggregatorTest {
+
+	/**
+	 * Helper method for summarizing a list of values
+	 */
+	protected NumericColumnSummary<Short> summarize(Integer... values) {
+
+		ShortValue[] shortValues = new ShortValue[values.length];
+		for(int i = 0; i < values.length; i++) {
+			if (values[i] != null) {
+				shortValues[i] = new ShortValue(values[i].shortValue());
+			}
+		}
+
+		return new AggregateCombineHarness<ShortValue,NumericColumnSummary<Short>,ValueSummaryAggregator.ShortValueSummaryAggregator>() {
+
+			@Override
+			protected void compareResults(NumericColumnSummary<Short> result1, NumericColumnSummary<Short> result2) {
+
+				Assert.assertEquals(result1.getTotalCount(), result2.getTotalCount());
+				Assert.assertEquals(result1.getNullCount(),result2.getNullCount());
+				Assert.assertEquals(result1.getMissingCount(),result2.getMissingCount());
+				Assert.assertEquals(result1.getNonMissingCount(),result2.getNonMissingCount());
+				Assert.assertEquals(result1.getInfinityCount(),result2.getInfinityCount());
+				Assert.assertEquals(result1.getNanCount(),result2.getNanCount());
+
+				Assert.assertEquals(result1.containsNull(),result2.containsNull());
+				Assert.assertEquals(result1.containsNonNull(),result2.containsNonNull());
+
+				Assert.assertEquals(result1.getMin().shortValue(),result2.getMin().shortValue());
+				Assert.assertEquals(result1.getMax().shortValue(), result2.getMax().shortValue());
+				Assert.assertEquals(result1.getSum().shortValue(),result2.getSum().shortValue());
+				Assert.assertEquals(result1.getMean().doubleValue(), result2.getMean().doubleValue(), 1e-12d);
+				Assert.assertEquals(result1.getVariance().doubleValue(),result2.getVariance().doubleValue(), 1e-9d);
+				Assert.assertEquals(result1.getStandardDeviation().doubleValue(),result2.getStandardDeviation().doubleValue(), 1e-12d);
+			}
+		}.summarize(shortValues);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/StringSummaryAggregatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/StringSummaryAggregatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/StringSummaryAggregatorTest.java
new file mode 100644
index 0000000..02fc125
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/StringSummaryAggregatorTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.summarize.aggregation;
+
+import org.apache.flink.api.java.summarize.StringColumnSummary;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class StringSummaryAggregatorTest {
+
+	@Test
+	public void testMixedGroup() {
+		StringColumnSummary summary = summarize("abc", "", null, "  ", "defghi", "foo", null, null, "", " ");
+		Assert.assertEquals(10, summary.getTotalCount());
+		Assert.assertEquals(3, summary.getNullCount());
+		Assert.assertEquals(7, summary.getNonNullCount());
+		Assert.assertEquals(2, summary.getEmptyCount());
+		Assert.assertEquals(0, summary.getMinLength().intValue());
+		Assert.assertEquals(6, summary.getMaxLength().intValue());
+		Assert.assertEquals(2.142857, summary.getMeanLength().doubleValue(), 0.001);
+	}
+
+	@Test
+	public void testAllNullStrings() {
+		StringColumnSummary summary = summarize(null, null, null, null);
+		Assert.assertEquals(4, summary.getTotalCount());
+		Assert.assertEquals(4, summary.getNullCount());
+		Assert.assertEquals(0, summary.getNonNullCount());
+		Assert.assertEquals(0, summary.getEmptyCount());
+		Assert.assertNull(summary.getMinLength());
+		Assert.assertNull(summary.getMaxLength());
+		Assert.assertNull(summary.getMeanLength());
+	}
+
+	@Test
+	public void testAllWithValues() {
+		StringColumnSummary summary = summarize("cat", "hat", "dog", "frog");
+		Assert.assertEquals(4, summary.getTotalCount());
+		Assert.assertEquals(0, summary.getNullCount());
+		Assert.assertEquals(4, summary.getNonNullCount());
+		Assert.assertEquals(0, summary.getEmptyCount());
+		Assert.assertEquals(3, summary.getMinLength().intValue());
+		Assert.assertEquals(4, summary.getMaxLength().intValue());
+		Assert.assertEquals(3.25, summary.getMeanLength().doubleValue(), 0.0);
+	}
+
+	/**
+	 * Helper method for summarizing a list of values.
+	 *
+	 * This method breaks the rule of "testing only one thing" by aggregating and combining
+	 * a bunch of different ways.
+	 */
+	protected StringColumnSummary summarize(String... values) {
+
+		return new AggregateCombineHarness<String,StringColumnSummary,StringSummaryAggregator>(){
+
+			@Override
+			protected void compareResults(StringColumnSummary result1, StringColumnSummary result2) {
+				Assert.assertEquals(result1.getEmptyCount(), result2.getEmptyCount());
+				Assert.assertEquals(result1.getMaxLength(), result2.getMaxLength());
+				Assert.assertEquals(result1.getMinLength(), result2.getMinLength());
+				if (result1.getMeanLength() == null) {
+					Assert.assertEquals(result1.getMeanLength(), result2.getMeanLength());
+				}
+				else {
+					Assert.assertEquals(result1.getMeanLength().doubleValue(), result2.getMeanLength().doubleValue(), 1e-5d);
+				}
+				Assert.assertEquals(result1.getNullCount(), result2.getNullCount());
+				Assert.assertEquals(result1.getNonNullCount(), result2.getNonNullCount());
+			}
+
+		}.summarize(values);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/StringValueSummaryAggregatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/StringValueSummaryAggregatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/StringValueSummaryAggregatorTest.java
new file mode 100644
index 0000000..19bfd52
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/StringValueSummaryAggregatorTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.summarize.aggregation;
+
+import org.apache.flink.api.java.summarize.StringColumnSummary;
+import org.apache.flink.types.StringValue;
+import org.junit.Assert;
+
+public class StringValueSummaryAggregatorTest extends StringSummaryAggregatorTest {
+
+	/**
+	 * Helper method for summarizing a list of values.
+	 *
+	 * This method breaks the rule of "testing only one thing" by aggregating and combining
+	 * a bunch of different ways.
+	 */
+	@Override
+	protected StringColumnSummary summarize(String... values) {
+
+		StringValue[] stringValues = new StringValue[values.length];
+		for(int i = 0; i < values.length; i++) {
+			if (values[i] != null) {
+				stringValues[i] = new StringValue(values[i]);
+			}
+		}
+
+		return new AggregateCombineHarness<StringValue,StringColumnSummary,ValueSummaryAggregator.StringValueSummaryAggregator>(){
+
+			@Override
+			protected void compareResults(StringColumnSummary result1, StringColumnSummary result2) {
+				Assert.assertEquals(result1.getEmptyCount(), result2.getEmptyCount());
+				Assert.assertEquals(result1.getMaxLength(), result2.getMaxLength());
+				Assert.assertEquals(result1.getMinLength(), result2.getMinLength());
+				if (result1.getMeanLength() == null) {
+					Assert.assertEquals(result1.getMeanLength(), result2.getMeanLength());
+				}
+				else {
+					Assert.assertEquals(result1.getMeanLength().doubleValue(), result2.getMeanLength().doubleValue(), 1e-5d);
+				}
+
+				Assert.assertEquals(result1.getNullCount(), result2.getNullCount());
+				Assert.assertEquals(result1.getNonNullCount(), result2.getNonNullCount());
+			}
+
+		}.summarize(stringValues);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/SummaryAggregatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/SummaryAggregatorFactoryTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/SummaryAggregatorFactoryTest.java
new file mode 100644
index 0000000..8134a90
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/SummaryAggregatorFactoryTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.summarize.aggregation;
+
+import org.apache.flink.types.*;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+
+public class SummaryAggregatorFactoryTest {
+
+	@Test
+	public void testCreate() throws Exception {
+		// supported primitive types
+		Assert.assertEquals(StringSummaryAggregator.class, SummaryAggregatorFactory.create(String.class).getClass());
+		Assert.assertEquals(ShortSummaryAggregator.class, SummaryAggregatorFactory.create(Short.class).getClass());
+		Assert.assertEquals(IntegerSummaryAggregator.class, SummaryAggregatorFactory.create(Integer.class).getClass());
+		Assert.assertEquals(LongSummaryAggregator.class, SummaryAggregatorFactory.create(Long.class).getClass());
+		Assert.assertEquals(FloatSummaryAggregator.class, SummaryAggregatorFactory.create(Float.class).getClass());
+		Assert.assertEquals(DoubleSummaryAggregator.class, SummaryAggregatorFactory.create(Double.class).getClass());
+		Assert.assertEquals(BooleanSummaryAggregator.class, SummaryAggregatorFactory.create(Boolean.class).getClass());
+
+		// supported value types
+		Assert.assertEquals(ValueSummaryAggregator.StringValueSummaryAggregator.class, SummaryAggregatorFactory.create(StringValue.class).getClass());
+		Assert.assertEquals(ValueSummaryAggregator.ShortValueSummaryAggregator.class, SummaryAggregatorFactory.create(ShortValue.class).getClass());
+		Assert.assertEquals(ValueSummaryAggregator.IntegerValueSummaryAggregator.class, SummaryAggregatorFactory.create(IntValue.class).getClass());
+		Assert.assertEquals(ValueSummaryAggregator.LongValueSummaryAggregator.class, SummaryAggregatorFactory.create(LongValue.class).getClass());
+		Assert.assertEquals(ValueSummaryAggregator.FloatValueSummaryAggregator.class, SummaryAggregatorFactory.create(FloatValue.class).getClass());
+		Assert.assertEquals(ValueSummaryAggregator.DoubleValueSummaryAggregator.class, SummaryAggregatorFactory.create(DoubleValue.class).getClass());
+		Assert.assertEquals(ValueSummaryAggregator.BooleanValueSummaryAggregator.class, SummaryAggregatorFactory.create(BooleanValue.class).getClass());
+
+		// some not well supported types - these fallback to ObjectSummaryAggregator
+		Assert.assertEquals(ObjectSummaryAggregator.class, SummaryAggregatorFactory.create(Object.class).getClass());
+		Assert.assertEquals(ObjectSummaryAggregator.class, SummaryAggregatorFactory.create(List.class).getClass());
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java
index afbcb89..1409848 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java
@@ -22,9 +22,15 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.summarize.BooleanColumnSummary;
+import org.apache.flink.api.java.summarize.NumericColumnSummary;
+import org.apache.flink.api.java.summarize.StringColumnSummary;
+import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple8;
 import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.types.DoubleValue;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -106,4 +112,76 @@ public class DataSetUtilsITCase extends MultipleProgramsTestBase {
 		Assert.assertEquals(checksum.getCount(), 15);
 		Assert.assertEquals(checksum.getChecksum(), 55);
 	}
+
+	@Test
+	public void testSummarize() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		List<Tuple8<Short, Integer, Long, Float, Double, String, Boolean, DoubleValue>> data = new ArrayList<>();
+		data.add(new Tuple8<>((short)1, 1, 100L, 0.1f, 1.012376, "hello", false, new DoubleValue(50.0)));
+		data.add(new Tuple8<>((short)2, 2, 1000L, 0.2f, 2.003453, "hello", true, new DoubleValue(50.0)));
+		data.add(new Tuple8<>((short)4, 10, 10000L, 0.2f, 75.00005, "null", true, new DoubleValue(50.0)));
+		data.add(new Tuple8<>((short)10, 4, 100L, 0.9f, 79.5, "", true, new DoubleValue(50.0)));
+		data.add(new Tuple8<>((short)5, 5, 1000L, 0.2f, 10.0000001, "a", false, new DoubleValue(50.0)));
+		data.add(new Tuple8<>((short)6, 6, 10L, 0.1f, 0.0000000000023, "", true, new DoubleValue(100.0)));
+		data.add(new Tuple8<>((short)7, 7, 1L, 0.2f, Double.POSITIVE_INFINITY, "abcdefghijklmnop", true, new DoubleValue(100.0)));
+		data.add(new Tuple8<>((short)8, 8, -100L, 0.001f, Double.NaN, "abcdefghi", true, new DoubleValue(100.0)));
+
+		Collections.shuffle(data);
+
+		DataSet<Tuple8<Short, Integer, Long, Float, Double, String, Boolean, DoubleValue>> ds = env.fromCollection(data);
+
+		// call method under test
+		Tuple results = DataSetUtils.summarize(ds);
+
+		Assert.assertEquals(8, results.getArity());
+
+		NumericColumnSummary<Short> col0Summary = results.getField(0);
+		Assert.assertEquals(8, col0Summary.getNonMissingCount());
+		Assert.assertEquals(1, col0Summary.getMin().shortValue());
+		Assert.assertEquals(10, col0Summary.getMax().shortValue());
+		Assert.assertEquals(5.375, col0Summary.getMean().doubleValue(), 0.0);
+
+		NumericColumnSummary<Integer> col1Summary = results.getField(1);
+		Assert.assertEquals(1, col1Summary.getMin().intValue());
+		Assert.assertEquals(10, col1Summary.getMax().intValue());
+		Assert.assertEquals(5.375, col1Summary.getMean().doubleValue(), 0.0);
+
+		NumericColumnSummary<Long> col2Summary = results.getField(2);
+		Assert.assertEquals(-100L, col2Summary.getMin().longValue());
+		Assert.assertEquals(10000L, col2Summary.getMax().longValue());
+
+		NumericColumnSummary<Float> col3Summary = results.getField(3);
+		Assert.assertEquals(8, col3Summary.getTotalCount());
+		Assert.assertEquals(0.001000, col3Summary.getMin().doubleValue(), 0.0000001);
+		Assert.assertEquals(0.89999999, col3Summary.getMax().doubleValue(), 0.0000001);
+		Assert.assertEquals(0.2376249988883501, col3Summary.getMean().doubleValue(), 0.000000000001);
+		Assert.assertEquals(0.0768965488108089, col3Summary.getVariance().doubleValue(), 0.00000001);
+		Assert.assertEquals(0.27730226975415995, col3Summary.getStandardDeviation().doubleValue(), 0.000000000001);
+
+		NumericColumnSummary<Double> col4Summary = results.getField(4);
+		Assert.assertEquals(6, col4Summary.getNonMissingCount());
+		Assert.assertEquals(2, col4Summary.getMissingCount());
+		Assert.assertEquals(0.0000000000023, col4Summary.getMin().doubleValue(), 0.0);
+		Assert.assertEquals(79.5, col4Summary.getMax().doubleValue(), 0.000000000001);
+
+		StringColumnSummary col5Summary = results.getField(5);
+		Assert.assertEquals(8, col5Summary.getTotalCount());
+		Assert.assertEquals(0, col5Summary.getNullCount());
+		Assert.assertEquals(8, col5Summary.getNonNullCount());
+		Assert.assertEquals(2, col5Summary.getEmptyCount());
+		Assert.assertEquals(0, col5Summary.getMinLength().intValue());
+		Assert.assertEquals(16, col5Summary.getMaxLength().intValue());
+		Assert.assertEquals(5.0, col5Summary.getMeanLength().doubleValue(), 0.0001);
+
+		BooleanColumnSummary col6Summary = results.getField(6);
+		Assert.assertEquals(8, col6Summary.getTotalCount());
+		Assert.assertEquals(2, col6Summary.getFalseCount());
+		Assert.assertEquals(6, col6Summary.getTrueCount());
+		Assert.assertEquals(0, col6Summary.getNullCount());
+
+		NumericColumnSummary<Double> col7Summary = results.getField(7);
+		Assert.assertEquals(100.0, col7Summary.getMax().doubleValue(), 0.00001);
+		Assert.assertEquals(50.0, col7Summary.getMin().doubleValue(), 0.00001);
+	}
 }


Mime
View raw message