flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/3] flink git commit: [FLINK-2683] [FLINK-2682] [runtime] Add dedicated operator for aligned processing time windows.
Date Tue, 22 Sep 2015 11:25:49 GMT
Repository: flink
Updated Branches:
  refs/heads/master 64e1dc6fe -> b8f58fab5


http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingAlignedProcessingTimeWindowOperatorTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingAlignedProcessingTimeWindowOperatorTest.java
new file mode 100644
index 0000000..464df32
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -0,0 +1,528 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windows;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.operators.TriggerTimer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+
+import org.junit.After;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@SuppressWarnings("serial")
+public class AggregatingAlignedProcessingTimeWindowOperatorTest {
+
+	@SuppressWarnings("unchecked")
+	private final ReduceFunction<String> mockFunction = mock(ReduceFunction.class);
+
+	@SuppressWarnings("unchecked")
+	private final KeySelector<String, String> mockKeySelector = mock(KeySelector.class);
+	
+	private final KeySelector<Integer, Integer> identitySelector = new KeySelector<Integer,
Integer>() {
+		@Override
+		public Integer getKey(Integer value) {
+			return value;
+		}
+	};
+	
+	private final ReduceFunction<Integer> sumFunction = new ReduceFunction<Integer>()
{
+		@Override
+		public Integer reduce(Integer value1, Integer value2) {
+			return value1 + value2;
+		}
+	};
+
+	// ------------------------------------------------------------------------
+
+	@After
+	public void checkNoTriggerThreadsRunning() {
+		// make sure that all the threads we trigger are shut down
+		long deadline = System.currentTimeMillis() + 5000;
+		while (TriggerTimer.TRIGGER_THREADS_GROUP.activeCount() > 0 && System.currentTimeMillis()
< deadline) {
+			try {
+				Thread.sleep(10);
+			}
+			catch (InterruptedException ignored) {}
+		}
+
+		assertTrue("Not all trigger threads where properly shut down",
+				TriggerTimer.TRIGGER_THREADS_GROUP.activeCount() == 0);
+	}
+	
+	// ------------------------------------------------------------------------
+	
+	@Test
+	public void testInvalidParameters() {
+		try {
+			assertInvalidParameter(-1L, -1L);
+			assertInvalidParameter(10000L, -1L);
+			assertInvalidParameter(-1L, 1000L);
+			assertInvalidParameter(1000L, 2000L);
+			
+			// actual internal slide is too low here:
+			assertInvalidParameter(1000L, 999L);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testWindowSizeAndSlide() {
+		try {
+			AbstractAlignedProcessingTimeWindowOperator<String, String, String> op;
+			
+			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
5000, 1000);
+			assertEquals(5000, op.getWindowSize());
+			assertEquals(1000, op.getWindowSlide());
+			assertEquals(1000, op.getPaneSize());
+			assertEquals(5, op.getNumPanesPerWindow());
+
+			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
1000, 1000);
+			assertEquals(1000, op.getWindowSize());
+			assertEquals(1000, op.getWindowSlide());
+			assertEquals(1000, op.getPaneSize());
+			assertEquals(1, op.getNumPanesPerWindow());
+
+			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
1500, 1000);
+			assertEquals(1500, op.getWindowSize());
+			assertEquals(1000, op.getWindowSlide());
+			assertEquals(500, op.getPaneSize());
+			assertEquals(3, op.getNumPanesPerWindow());
+
+			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
1200, 1100);
+			assertEquals(1200, op.getWindowSize());
+			assertEquals(1100, op.getWindowSlide());
+			assertEquals(100, op.getPaneSize());
+			assertEquals(12, op.getNumPanesPerWindow());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testWindowTriggerTimeAlignment() {
+		try {
+			@SuppressWarnings("unchecked")
+			final Output<StreamRecord<String>> mockOut = mock(Output.class);
+			
+			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
+			when(mockContext.getTaskName()).thenReturn("Test task name");
+			
+			AbstractAlignedProcessingTimeWindowOperator<String, String, String> op;
+
+			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
5000, 1000);
+			op.setup(mockOut, mockContext);
+			op.open(new Configuration());
+			assertTrue(op.getNextSlideTime() % 1000 == 0);
+			assertTrue(op.getNextEvaluationTime() % 1000 == 0);
+			op.dispose();
+
+			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
1000, 1000);
+			op.setup(mockOut, mockContext);
+			op.open(new Configuration());
+			assertTrue(op.getNextSlideTime() % 1000 == 0);
+			assertTrue(op.getNextEvaluationTime() % 1000 == 0);
+			op.dispose();
+
+			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
1500, 1000);
+			op.setup(mockOut, mockContext);
+			op.open(new Configuration());
+			assertTrue(op.getNextSlideTime() % 500 == 0);
+			assertTrue(op.getNextEvaluationTime() % 1000 == 0);
+			op.dispose();
+
+			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
1200, 1100);
+			op.setup(mockOut, mockContext);
+			op.open(new Configuration());
+			assertTrue(op.getNextSlideTime() % 100 == 0);
+			assertTrue(op.getNextEvaluationTime() % 1100 == 0);
+			op.dispose();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testTumblingWindowUniqueElements() {
+		try {
+			final int windowSize = 50;
+			final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize);
+
+			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
+			when(mockContext.getTaskName()).thenReturn("Test task name");
+			
+			AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
+					new AggregatingProcessingTimeWindowOperator<>(
+							sumFunction, identitySelector, windowSize, windowSize);
+
+			op.setup(out, mockContext);
+			op.open(new Configuration());
+
+			final int numElements = 1000;
+
+			for (int i = 0; i < numElements; i++) {
+				op.processElement(new StreamRecord<Integer>(i));
+				Thread.sleep(1);
+			}
+
+			op.close();
+			op.dispose();
+
+			// get and verify the result
+			List<Integer> result = out.getElements();
+			assertEquals(numElements, result.size());
+
+			Collections.sort(result);
+			for (int i = 0; i < numElements; i++) {
+				assertEquals(i, result.get(i).intValue());
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testTumblingWindowDuplicateElements() {
+		try {
+			final int windowSize = 50;
+			final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize);
+
+			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
+			when(mockContext.getTaskName()).thenReturn("Test task name");
+			
+			AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
+					new AggregatingProcessingTimeWindowOperator<>(
+							sumFunction, identitySelector, windowSize, windowSize);
+
+			op.setup(out, mockContext);
+			op.open(new Configuration());
+
+			final int numWindows = 10;
+
+			long previousNextTime = 0;
+			int window = 1;
+			
+			while (window <= numWindows) {
+				long nextTime = op.getNextEvaluationTime();
+				int val = ((int) nextTime) ^ ((int) (nextTime >>> 32));
+				
+				op.processElement(new StreamRecord<Integer>(val));
+				
+				if (nextTime != previousNextTime) {
+					window++;
+					previousNextTime = nextTime;
+				}
+				
+				Thread.sleep(1);
+			}
+
+			op.close();
+			op.dispose();
+			
+			List<Integer> result = out.getElements();
+			
+			// we have ideally one element per window. we may have more, when we emitted a value into
the
+			// successive window (corner case), so we can have twice the number of elements, in the
worst case.
+			assertTrue(result.size() >= numWindows && result.size() <= 2 * numWindows);
+
+			// deduplicate for more accurate checks
+			HashSet<Integer> set = new HashSet<>(result);
+			assertTrue(set.size() == 10 || set.size() == 11);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testSlidingWindow() {
+		try {
+			final CollectingOutput<Integer> out = new CollectingOutput<>(50);
+
+			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
+			when(mockContext.getTaskName()).thenReturn("Test task name");
+
+			// tumbling window that triggers every 20 milliseconds
+			AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
+					new AggregatingProcessingTimeWindowOperator<>(sumFunction, identitySelector, 150,
50);
+
+			op.setup(out, mockContext);
+			op.open(new Configuration());
+
+			final int numElements = 1000;
+
+			for (int i = 0; i < numElements; i++) {
+				op.processElement(new StreamRecord<Integer>(i));
+				Thread.sleep(1);
+			}
+
+			op.close();
+			op.dispose();
+
+			// get and verify the result
+			List<Integer> result = out.getElements();
+			
+			// every element can occur between one and three times
+			if (result.size() < numElements || result.size() > 3 * numElements) {
+				System.out.println(result);
+				fail("Wrong number of results: " + result.size());
+			}
+
+			Collections.sort(result);
+			int lastNum = -1;
+			int lastCount = -1;
+			
+			for (int num : result) {
+				if (num == lastNum) {
+					lastCount++;
+					assertTrue(lastCount <= 3);
+				}
+				else {
+					lastNum = num;
+					lastCount = 1;
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testSlidingWindowSingleElements() {
+		try {
+			final CollectingOutput<Integer> out = new CollectingOutput<>(50);
+
+			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
+			when(mockContext.getTaskName()).thenReturn("Test task name");
+
+			// tumbling window that triggers every 20 milliseconds
+			AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
+					new AggregatingProcessingTimeWindowOperator<>(sumFunction, identitySelector, 150,
50);
+
+			op.setup(out, mockContext);
+			op.open(new Configuration());
+			
+			op.processElement(new StreamRecord<Integer>(1));
+			op.processElement(new StreamRecord<Integer>(2));
+
+			// each element should end up in the output three times
+			// wait until the elements have arrived 6 times in the output
+			out.waitForNElements(6, 120000);
+			
+			List<Integer> result = out.getElements();
+			assertEquals(6, result.size());
+			
+			Collections.sort(result);
+			assertEquals(Arrays.asList(1, 1, 1, 2, 2, 2), result);
+			
+			op.close();
+			op.dispose();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testEmitTrailingDataOnClose() {
+		try {
+			final CollectingOutput<Integer> out = new CollectingOutput<>();
+
+			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
+			when(mockContext.getTaskName()).thenReturn("Test task name");
+			
+			// the operator has a window time that is so long that it will not fire in this test
+			final long oneYear = 365L * 24 * 60 * 60 * 1000;
+			AggregatingProcessingTimeWindowOperator<Integer, Integer> op = 
+					new AggregatingProcessingTimeWindowOperator<>(sumFunction, identitySelector, oneYear,
oneYear);
+			
+			op.setup(out, mockContext);
+			op.open(new Configuration());
+			
+			List<Integer> data = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+			for (Integer i : data) {
+				op.processElement(new StreamRecord<Integer>(i));
+			}
+			
+			op.close();
+			op.dispose();
+			
+			// get and verify the result
+			List<Integer> result = out.getElements();
+			Collections.sort(result);
+			assertEquals(data, result);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testPropagateExceptionsFromTrigger() {
+		try {
+			final CollectingOutput<Integer> out = new CollectingOutput<>();
+
+			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
+			when(mockContext.getTaskName()).thenReturn("Test task name");
+
+			ReduceFunction<Integer> failingFunction = new FailingFunction(100);
+
+			AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
+					new AggregatingProcessingTimeWindowOperator<>(failingFunction, identitySelector,
200, 50);
+
+			op.setup(out, mockContext);
+			op.open(new Configuration());
+
+			try {
+				long nextWindowTime = op.getNextEvaluationTime();
+				int val = 0;
+				for (int num = 0; num < Integer.MAX_VALUE; num++) {
+					op.processElement(new StreamRecord<Integer>(val++));
+					Thread.sleep(1);
+					
+					// when the window has advanced, reset the value, to generate the same values
+					// in the next pane again. This causes the aggregation on trigger to reduce values
+					if (op.getNextEvaluationTime() != nextWindowTime) {
+						nextWindowTime = op.getNextEvaluationTime();
+						val = 0;
+					}
+				}
+				fail("This should really have failed with an exception quite a while ago...");
+			}
+			catch (Exception e) {
+				assertNotNull(e.getCause());
+				assertTrue(e.getCause().getMessage().contains("Artificial Test Exception"));
+			}
+			
+			op.dispose();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testPropagateExceptionsFromProcessElement() {
+		try {
+			final CollectingOutput<Integer> out = new CollectingOutput<>();
+
+			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
+			when(mockContext.getTaskName()).thenReturn("Test task name");
+
+			ReduceFunction<Integer> failingFunction = new FailingFunction(100);
+
+			// the operator has a window time that is so long that it will not fire in this test
+			final long hundredYears = 100L * 365 * 24 * 60 * 60 * 1000;
+			AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
+					new AggregatingProcessingTimeWindowOperator<>(
+							failingFunction, identitySelector, hundredYears, hundredYears);
+
+			op.setup(out, mockContext);
+			op.open(new Configuration());
+
+			for (int i = 0; i < 100; i++) {
+				op.processElement(new StreamRecord<Integer>(1));
+			}
+			
+			try {
+				op.processElement(new StreamRecord<Integer>(1));
+				fail("This fail with an exception");
+			}
+			catch (Exception e) {
+				assertTrue(e.getMessage().contains("Artificial Test Exception"));
+			}
+
+			op.dispose();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+	
+	private void assertInvalidParameter(long windowSize, long windowSlide) {
+		try {
+			new AggregatingProcessingTimeWindowOperator<String, String>(
+					mockFunction, mockKeySelector, windowSize, windowSlide);
+			fail("This should fail with an IllegalArgumentException");
+		}
+		catch (IllegalArgumentException e) {
+			// expected
+		}
+		catch (Exception e) {
+			fail("Wrong exception. Expected IllegalArgumentException but found " + e.getClass().getSimpleName());
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	
+	private static class FailingFunction implements ReduceFunction<Integer> {
+
+		private final int failAfterElements;
+		
+		private int numElements;
+
+		FailingFunction(int failAfterElements) {
+			this.failAfterElements = failAfterElements;
+		}
+
+		@Override
+		public Integer reduce(Integer value1, Integer value2) throws Exception {
+			numElements++;
+
+			if (numElements >= failAfterElements) {
+				throw new Exception("Artificial Test Exception");
+			}
+			
+			return value1 + value2;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/CollectingOutput.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/CollectingOutput.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/CollectingOutput.java
new file mode 100644
index 0000000..9f6858d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/CollectingOutput.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windows;
+
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class CollectingOutput<T> implements Output<StreamRecord<T>> {
+	
+	private final List<T> elements = new ArrayList<>();
+
+	private final int timeStampModulus;
+
+
+	public CollectingOutput() {
+		this.timeStampModulus = 0;
+	}
+	
+	public CollectingOutput(int timeStampModulus) {
+		this.timeStampModulus = timeStampModulus;
+	}
+
+	// ------------------------------------------------------------------------
+	
+	public List<T> getElements() {
+		return elements;
+	}
+	
+	public void waitForNElements(int n, long timeout) throws InterruptedException {
+		long deadline = System.currentTimeMillis() + timeout;
+		synchronized (elements) {
+			long now;
+			while (elements.size() < n && (now = System.currentTimeMillis()) < deadline)
{
+				elements.wait(deadline - now);
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public void emitWatermark(Watermark mark) {
+		throw new UnsupportedOperationException("the output should not emit watermarks");
+	}
+
+	@Override
+	public void collect(StreamRecord<T> record) {
+		elements.add(record.getValue());
+		
+		if (timeStampModulus != 0 && record.getTimestamp() % timeStampModulus != 0) {
+			throw new IllegalArgumentException("Invalid timestamp");
+		}
+		synchronized (elements) {
+			elements.notifyAll();
+		}
+	}
+
+	@Override
+	public void close() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapPutIfAbsentTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapPutIfAbsentTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapPutIfAbsentTest.java
new file mode 100644
index 0000000..2a9e203
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapPutIfAbsentTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windows;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class KeyMapPutIfAbsentTest {
+	
+	@Test
+	public void testPutIfAbsentUniqueKeysAndGrowth() {
+		try {
+			KeyMap<Integer, Integer> map = new KeyMap<>();
+			IntegerFactory factory = new IntegerFactory();
+			
+			final int numElements = 1000000;
+			
+			for (int i = 0; i < numElements; i++) {
+				factory.set(2 * i + 1);
+				map.putIfAbsent(i, factory);
+
+				assertEquals(i+1, map.size());
+				assertTrue(map.getCurrentTableCapacity() > map.size());
+				assertTrue(map.getCurrentTableCapacity() > map.getRehashThreshold());
+				assertTrue(map.size() <= map.getRehashThreshold());
+			}
+			
+			assertEquals(numElements, map.size());
+			assertEquals(numElements, map.traverseAndCountElements());
+			assertEquals(1 << 21, map.getCurrentTableCapacity());
+
+			for (int i = 0; i < numElements; i++) {
+				assertEquals(2 * i + 1, map.get(i).intValue());
+			}
+			
+			for (int i = numElements - 1; i >= 0; i--) {
+				assertEquals(2 * i + 1, map.get(i).intValue());
+			}
+
+			assertEquals(numElements, map.size());
+			assertEquals(numElements, map.traverseAndCountElements());
+			assertEquals(1 << 21, map.getCurrentTableCapacity());
+			assertTrue(map.getLongestChainLength() <= 7);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testPutIfAbsentDuplicateKeysAndGrowth() {
+		try {
+			KeyMap<Integer, Integer> map = new KeyMap<>();
+			IntegerFactory factory = new IntegerFactory();
+			
+			final int numElements = 1000000;
+
+			for (int i = 0; i < numElements; i++) {
+				int val = 2 * i + 1;
+				factory.set(val);
+				Integer put = map.putIfAbsent(i, factory);
+				assertEquals(val, put.intValue());
+			}
+
+			for (int i = 0; i < numElements; i += 3) {
+				factory.set(2 * i);
+				Integer put = map.putIfAbsent(i, factory);
+				assertEquals(2 * i + 1, put.intValue());
+			}
+
+			for (int i = 0; i < numElements; i++) {
+				assertEquals(2 * i + 1, map.get(i).intValue());
+			}
+
+			assertEquals(numElements, map.size());
+			assertEquals(numElements, map.traverseAndCountElements());
+			assertEquals(1 << 21, map.getCurrentTableCapacity());
+			assertTrue(map.getLongestChainLength() <= 7);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+	
+	private static class IntegerFactory implements KeyMap.LazyFactory<Integer> {
+		
+		private Integer toCreate;
+		
+		public void set(Integer toCreate) {
+			this.toCreate = toCreate;
+		}
+
+		@Override
+		public Integer create() {
+			return toCreate;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapPutTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapPutTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapPutTest.java
new file mode 100644
index 0000000..7335976
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapPutTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windows;
+
+import org.junit.Test;
+
+import java.util.BitSet;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class KeyMapPutTest {
+
+	@Test
+	public void testPutUniqueKeysAndGrowth() {
+		try {
+			KeyMap<Integer, Integer> map = new KeyMap<>();
+
+			final int numElements = 1000000;
+
+			for (int i = 0; i < numElements; i++) {
+				map.put(i, 2 * i + 1);
+
+				assertEquals(i+1, map.size());
+				assertTrue(map.getCurrentTableCapacity() > map.size());
+				assertTrue(map.getCurrentTableCapacity() > map.getRehashThreshold());
+				assertTrue(map.size() <= map.getRehashThreshold());
+			}
+
+			assertEquals(numElements, map.size());
+			assertEquals(numElements, map.traverseAndCountElements());
+			assertEquals(1 << 21, map.getCurrentTableCapacity());
+
+			for (int i = 0; i < numElements; i++) {
+				assertEquals(2 * i + 1, map.get(i).intValue());
+			}
+
+			for (int i = numElements - 1; i >= 0; i--) {
+				assertEquals(2 * i + 1, map.get(i).intValue());
+			}
+
+			BitSet bitset = new BitSet();
+			int numContained = 0;
+			for (KeyMap.Entry<Integer, Integer> entry : map) {
+				numContained++;
+				
+				assertEquals(entry.getKey() * 2 + 1, entry.getValue().intValue());
+				assertFalse(bitset.get(entry.getKey()));
+				bitset.set(entry.getKey());
+			}
+
+			assertEquals(numElements, numContained);
+			assertEquals(numElements, bitset.cardinality());
+			
+			
+			assertEquals(numElements, map.size());
+			assertEquals(numElements, map.traverseAndCountElements());
+			assertEquals(1 << 21, map.getCurrentTableCapacity());
+			assertTrue(map.getLongestChainLength() <= 7);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testPutDuplicateKeysAndGrowth() {
+		try {
+			final KeyMap<Integer, Integer> map = new KeyMap<>();
+			final int numElements = 1000000;
+
+			for (int i = 0; i < numElements; i++) {
+				Integer put = map.put(i, 2*i+1);
+				assertNull(put);
+			}
+
+			for (int i = 0; i < numElements; i += 3) {
+				Integer put = map.put(i, 2*i);
+				assertNotNull(put);
+				assertEquals(2*i+1, put.intValue());
+			}
+
+			for (int i = 0; i < numElements; i++) {
+				int expected = (i % 3 == 0) ? (2*i) : (2*i+1);
+				assertEquals(expected, map.get(i).intValue());
+			}
+			
+			assertEquals(numElements, map.size());
+			assertEquals(numElements, map.traverseAndCountElements());
+			assertEquals(1 << 21, map.getCurrentTableCapacity());
+			assertTrue(map.getLongestChainLength() <= 7);
+
+			
+			BitSet bitset = new BitSet();
+			int numContained = 0;
+			for (KeyMap.Entry<Integer, Integer> entry : map) {
+				numContained++;
+
+				int key = entry.getKey();
+				int expected = key % 3 == 0 ? (2*key) : (2*key+1);
+
+				assertEquals(expected, entry.getValue().intValue());
+				assertFalse(bitset.get(key));
+				bitset.set(key);
+			}
+
+			assertEquals(numElements, numContained);
+			assertEquals(numElements, bitset.cardinality());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapTest.java
new file mode 100644
index 0000000..be71af2
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapTest.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windows;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Random;
+
+import static org.junit.Assert.*;
+
+public class KeyMapTest {
+	
+	@Test
+	public void testInitialSizeComputation() {
+		try {
+			KeyMap<String, String> map;
+
+			map = new KeyMap<>();
+			assertEquals(64, map.getCurrentTableCapacity());
+			assertEquals(6, map.getLog2TableCapacity());
+			assertEquals(24, map.getShift());
+			assertEquals(48, map.getRehashThreshold());
+			
+			map = new KeyMap<>(0);
+			assertEquals(64, map.getCurrentTableCapacity());
+			assertEquals(6, map.getLog2TableCapacity());
+			assertEquals(24, map.getShift());
+			assertEquals(48, map.getRehashThreshold());
+
+			map = new KeyMap<>(1);
+			assertEquals(64, map.getCurrentTableCapacity());
+			assertEquals(6, map.getLog2TableCapacity());
+			assertEquals(24, map.getShift());
+			assertEquals(48, map.getRehashThreshold());
+
+			map = new KeyMap<>(9);
+			assertEquals(64, map.getCurrentTableCapacity());
+			assertEquals(6, map.getLog2TableCapacity());
+			assertEquals(24, map.getShift());
+			assertEquals(48, map.getRehashThreshold());
+
+			map = new KeyMap<>(63);
+			assertEquals(64, map.getCurrentTableCapacity());
+			assertEquals(6, map.getLog2TableCapacity());
+			assertEquals(24, map.getShift());
+			assertEquals(48, map.getRehashThreshold());
+
+			map = new KeyMap<>(64);
+			assertEquals(128, map.getCurrentTableCapacity());
+			assertEquals(7, map.getLog2TableCapacity());
+			assertEquals(23, map.getShift());
+			assertEquals(96, map.getRehashThreshold());
+
+			map = new KeyMap<>(500);
+			assertEquals(512, map.getCurrentTableCapacity());
+			assertEquals(9, map.getLog2TableCapacity());
+			assertEquals(21, map.getShift());
+			assertEquals(384, map.getRehashThreshold());
+
+			map = new KeyMap<>(127);
+			assertEquals(128, map.getCurrentTableCapacity());
+			assertEquals(7, map.getLog2TableCapacity());
+			assertEquals(23, map.getShift());
+			assertEquals(96, map.getRehashThreshold());
+			
+			// no negative number of elements
+			try {
+				new KeyMap<>(-1);
+				fail("should fail with an exception");
+			}
+			catch (IllegalArgumentException e) {
+				// expected
+			}
+			
+			// check integer overflow
+			try {
+				map = new KeyMap<>(0x65715522);
+
+				final int maxCap = Integer.highestOneBit(Integer.MAX_VALUE);
+				assertEquals(Integer.highestOneBit(Integer.MAX_VALUE), map.getCurrentTableCapacity());
+				assertEquals(30, map.getLog2TableCapacity());
+				assertEquals(0, map.getShift());
+				assertEquals(maxCap / 4 * 3, map.getRehashThreshold());
+			}
+			catch (OutOfMemoryError e) {
+				// this may indeed happen in small test setups. we tolerate this in this test
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testPutAndGetRandom() {
+		try {
+			final KeyMap<Integer, Integer> map = new KeyMap<>();
+			final Random rnd = new Random();
+			
+			final long seed = rnd.nextLong();
+			final int numElements = 10000;
+			
+			final HashMap<Integer, Integer> groundTruth = new HashMap<>();
+			
+			rnd.setSeed(seed);
+			for (int i = 0; i < numElements; i++) {
+				Integer key = rnd.nextInt();
+				Integer value = rnd.nextInt();
+				
+				if (rnd.nextBoolean()) {
+					groundTruth.put(key, value);
+					map.put(key, value);
+				}
+			}
+
+			rnd.setSeed(seed);
+			for (int i = 0; i < numElements; i++) {
+				Integer key = rnd.nextInt();
+
+				// skip these, evaluating it is tricky due to duplicates
+				rnd.nextInt();
+				rnd.nextBoolean();
+				
+				Integer expected = groundTruth.get(key);
+				if (expected == null) {
+					assertNull(map.get(key));
+				}
+				else {
+					Integer contained = map.get(key);
+					assertNotNull(contained);
+					assertEquals(expected, contained);
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testConjunctTraversal() {
+		try {
+			final Random rootRnd = new Random(654685486325439L);
+			
+			final int numMaps = 7;
+			final int numKeys = 1000000;
+
+			// ------ create a set of maps ------
+			@SuppressWarnings("unchecked")
+			final KeyMap<Integer, Integer>[] maps = (KeyMap<Integer, Integer>[]) new KeyMap<?,
?>[numMaps];
+			for (int i = 0; i < numMaps; i++) {
+				maps[i] = new KeyMap<>();
+			}
+			
+			// ------ prepare probabilities for maps ------
+			final double[] probabilities = new double[numMaps];
+			final double[] probabilitiesTemp = new double[numMaps];
+			{
+				probabilities[0] = 0.5;
+				double remainingProb = 1.0 - probabilities[0];
+				for (int i = 1; i < numMaps - 1; i++) {
+					remainingProb /= 2;
+					probabilities[i] = remainingProb;
+				}
+
+				// compensate for rounding errors
+				probabilities[numMaps - 1] = remainingProb;
+			}
+			
+			// ------ generate random elements ------
+			final long probSeed = rootRnd.nextLong();
+			final long keySeed = rootRnd.nextLong();
+			
+			final Random probRnd = new Random(probSeed);
+			final Random keyRnd = new Random(keySeed);
+			
+			final int maxStride = Integer.MAX_VALUE / numKeys;
+			
+			int totalNumElements = 0;
+			int nextKeyValue = 1;
+			
+			for (int i = 0; i < numKeys; i++) {
+				int numCopies = (nextKeyValue % 3) + 1;
+				System.arraycopy(probabilities, 0, probabilitiesTemp, 0, numMaps);
+				
+				double totalProb = 1.0;
+				for (int copy = 0; copy < numCopies; copy++) {
+					int pos = drawPosProportionally(probabilitiesTemp, totalProb, probRnd);
+					totalProb -= probabilitiesTemp[pos];
+					probabilitiesTemp[pos] = 0.0;
+					
+					Integer boxed = nextKeyValue;
+					Integer previous = maps[pos].put(boxed, boxed);
+					assertNull("Test problem - test does not assign unique maps", previous);
+				}
+				
+				totalNumElements += numCopies;
+				nextKeyValue += keyRnd.nextInt(maxStride) + 1;
+			}
+			
+			
+			// check that all maps contain the total number of elements
+			int numContained = 0;
+			for (KeyMap<?, ?> map : maps) {
+				numContained += map.size();
+			}
+			assertEquals(totalNumElements, numContained);
+
+			// ------ check that all elements can be found in the maps ------
+			keyRnd.setSeed(keySeed);
+			
+			numContained = 0;
+			nextKeyValue = 1;
+			for (int i = 0; i < numKeys; i++) {
+				int numCopiesExpected = (nextKeyValue % 3) + 1;
+				int numCopiesContained = 0;
+				
+				for (KeyMap<Integer, Integer> map : maps) {
+					Integer val = map.get(nextKeyValue);
+					if (val != null) {
+						assertEquals(nextKeyValue, val.intValue());
+						numCopiesContained++;
+					}
+				}
+				
+				assertEquals(numCopiesExpected, numCopiesContained);
+				numContained += numCopiesContained;
+				
+				nextKeyValue += keyRnd.nextInt(maxStride) + 1;
+			}
+			assertEquals(totalNumElements, numContained);
+
+			// ------ make a traversal over all keys and validate the keys in the traversal ------
+			final int[] keysStartedAndFinished = { 0, 0 };
+			KeyMap.TraversalEvaluator<Integer, Integer> traversal = new KeyMap.TraversalEvaluator<Integer,
Integer>() {
+
+				private int key;
+				private int valueCount;
+				
+				@Override
+				public void startNewKey(Integer key) {
+					this.key = key;
+					this.valueCount = 0;
+					
+					keysStartedAndFinished[0]++;
+				}
+
+				@Override
+				public void nextValue(Integer value) {
+					assertEquals(this.key, value.intValue());
+					this.valueCount++;
+				}
+
+				@Override
+				public void keyDone() {
+					int expected = (key % 3) + 1;
+					if (expected != valueCount) {
+						fail("Wrong count for key " + key + " ; expected=" + expected + " , count=" + valueCount);
+					}
+					
+					keysStartedAndFinished[1]++;
+				}
+			};
+			
+			KeyMap.traverseMaps(shuffleArray(maps, rootRnd), traversal, 17);
+			
+			assertEquals(numKeys, keysStartedAndFinished[0]);
+			assertEquals(numKeys, keysStartedAndFinished[1]);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testSizeComparator() {
+		try {
+			KeyMap<String, String> map1 = new KeyMap<>(5);
+			KeyMap<String, String> map2 = new KeyMap<>(80);
+			
+			assertTrue(map1.getCurrentTableCapacity() < map2.getCurrentTableCapacity());
+			
+			assertTrue(KeyMap.CapacityDescendingComparator.INSTANCE.compare(map1, map1) == 0);
+			assertTrue(KeyMap.CapacityDescendingComparator.INSTANCE.compare(map2, map2) == 0);
+			assertTrue(KeyMap.CapacityDescendingComparator.INSTANCE.compare(map1, map2) > 0);
+			assertTrue(KeyMap.CapacityDescendingComparator.INSTANCE.compare(map2, map1) < 0);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+
+	private static int drawPosProportionally(double[] array, double totalProbability, Random
rnd) {
+		double val = rnd.nextDouble() * totalProbability;
+		
+		double accum = 0;
+		for (int i = 0; i < array.length; i++) {
+			accum += array[i];
+			if (val <= accum && array[i] > 0.0) {
+				return i;
+			}
+		}
+		
+		// in case of rounding errors
+		return array.length - 1;
+	}
+	
+	private static <E> E[] shuffleArray(E[] array, Random rnd) {
+		E[] target = Arrays.copyOf(array, array.length);
+		
+		for (int i = target.length - 1; i > 0; i--) {
+			int swapPos = rnd.nextInt(i + 1);
+			E temp = target[i];
+			target[i] = target[swapPos];
+			target[swapPos] = temp;
+		}
+		
+		return target;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
new file mode 100644
index 0000000..6e0724d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.windowing;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction;
+import org.apache.flink.streaming.runtime.operators.windows.AggregatingProcessingTimeWindowOperator;
+import org.apache.flink.util.Collector;
+
+@SuppressWarnings("serial")
+public class GroupedProcessingTimeWindowExample {
+	
+	public static void main(String[] args) throws Exception {
+		
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(4);
+		
+		DataStream<Tuple2<Long, Long>> stream = env
+				.addSource(new RichParallelSourceFunction<Tuple2<Long, Long>>() {
+					
+					private volatile boolean running = true;
+					
+					@Override
+					public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception
{
+						
+						final long startTime = System.currentTimeMillis();
+						
+						final long numElements = 20000000;
+						final long numKeys = 10000;
+						long val = 1L;
+						long count = 0L;
+						
+						
+						while (running && count < numElements) {
+							count++;
+							ctx.collect(new Tuple2<Long, Long>(val++, 1L));
+							
+							if (val > numKeys) {
+								val = 1L;
+							}
+						}
+
+						final long endTime = System.currentTimeMillis();
+						System.out.println("Took " + (endTime-startTime) + " msecs for " + numElements + "
values");
+					}
+
+					@Override
+					public void cancel() {
+						running = false;
+					}
+				});
+		
+		stream
+//				.groupBy(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>())
+//				.window(Time.of(2500, TimeUnit.MILLISECONDS)).every(Time.of(500, TimeUnit.MILLISECONDS))
+//				.reduceWindow(new SummingReducer())
+//				.flatten()
+//		.partitionByHash(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>())
+//		.transform(
+//				"Aligned time window",
+//				TypeInfoParser.<Tuple2<Long, Long>>parse("Tuple2<Long, Long>"),
+//				new AccumulatingProcessingTimeWindowOperator<Long, Tuple2<Long, Long>, Tuple2<Long,
Long>>(
+//						new SummingWindowFunction<Long>(),
+//						new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>(),
+//						2500, 500))
+			.transform(
+				"Aligned time window",
+				TypeInfoParser.<Tuple2<Long, Long>>parse("Tuple2<Long, Long>"),
+				new AggregatingProcessingTimeWindowOperator<Long, Tuple2<Long, Long>>(
+						new SummingReducer(),
+						new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>(),
+						2500, 500))
+				
+			.addSink(new SinkFunction<Tuple2<Long, Long>>() {
+					@Override
+					public void invoke(Tuple2<Long, Long> value) {
+			}
+		});
+		
+		env.execute();
+	}
+	
+	public static class FirstFieldKeyExtractor<Type extends Tuple, Key> implements KeySelector<Type,
Key> {
+		
+		@Override
+		@SuppressWarnings("unchecked")
+		public Key getKey(Type value) {
+			return (Key) value.getField(0);
+		}
+	}
+
+	public static class IdentityKeyExtractor<T> implements KeySelector<T, T> {
+
+		@Override
+		public T getKey(T value) {
+			return value;
+		}
+	}
+
+	public static class IdentityWindowFunction<K, T> implements KeyedWindowFunction<K,
T, T> {
+
+		@Override
+		public void evaluate(K k, Iterable<T> values, Collector<T> out) throws Exception
{
+			for (T v : values) {
+				out.collect(v);
+			}
+		}
+	}
+	
+	public static class CountingWindowFunction<K, T> implements KeyedWindowFunction<K,
T, Long> {
+		
+		@Override
+		public void evaluate(K k, Iterable<T> values, Collector<Long> out) throws Exception
{
+			long count = 0;
+			for (T ignored : values) {
+				count++;
+			}
+
+			out.collect(count);
+		}
+	}
+
+	public static class SummingWindowFunction<K> implements KeyedWindowFunction<K,
Tuple2<K, Long>, Tuple2<K, Long>> {
+
+		@Override
+		public void evaluate(K key, Iterable<Tuple2<K, Long>> values, Collector<Tuple2<K,
Long>> out) throws Exception {
+			long sum = 0L;
+			for (Tuple2<K, Long> value : values) {
+				sum += value.f1;
+			}
+
+			out.collect(new Tuple2<K, Long>(key, sum));
+		}
+	}
+
+	public static class SummingReducer implements ReduceFunction<Tuple2<Long, Long>>
{
+
+		@Override
+		public Tuple2<Long, Long> reduce(Tuple2<Long, Long> value1, Tuple2<Long,
Long> value2) {
+			return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
+		}
+	}
+}


Mime
View raw message