flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [31/36] flink git commit: [streaming] Deleted obsolete parts of the connected stream api
Date Wed, 07 Jan 2015 14:13:10 GMT
[streaming] Deleted obsolete parts of the connected stream api


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fcd28fcc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fcd28fcc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fcd28fcc

Branch: refs/heads/release-0.8
Commit: fcd28fccd38a070f69676f48b2038af9c6e5eb4e
Parents: fb86fde
Author: Gyula Fora <gyfora@apache.org>
Authored: Mon Jan 5 15:59:50 2015 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Tue Jan 6 00:22:58 2015 +0100

----------------------------------------------------------------------
 .../api/datastream/CoBatchedDataStream.java     | 126 --------
 .../api/datastream/CoWindowDataStream.java      | 112 -------
 .../api/datastream/ConnectedDataStream.java     | 167 ----------
 .../operator/co/CoBatchReduceInvokable.java     | 316 -------------------
 .../co/CoGroupedBatchReduceInvokable.java       |  79 -----
 .../co/CoGroupedWindowReduceInvokable.java      | 158 ----------
 .../operator/co/CoWindowReduceInvokable.java    | 189 -----------
 .../invokable/operator/CoBatchReduceTest.java   | 137 --------
 .../operator/CoGroupedBatchReduceTest.java      | 172 ----------
 .../operator/CoGroupedWindowReduceTest.java     | 211 -------------
 .../invokable/operator/CoWindowReduceTest.java  | 172 ----------
 11 files changed, 1839 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fcd28fcc/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoBatchedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoBatchedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoBatchedDataStream.java
deleted file mode 100644
index 3b58188..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoBatchedDataStream.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import org.apache.flink.streaming.api.invokable.operator.co.CoBatchReduceInvokable;
-import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedBatchReduceInvokable;
-import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
-
-/**
- * A {@link CoBatchedDataStream} represents a two data stream whose elements are
- * batched together in sliding batches. Operation
- * {@link #reduce(ReduceFunction)} can be applied for each batch and the batch
- * is slid afterwards.
- *
- * @param <IN1>
- *            The type of the first input data stream
- * @param <IN2>
- *            The type of the second input data stream
- */
-public class CoBatchedDataStream<IN1, IN2> extends ConnectedDataStream<IN1, IN2> {
-
-	protected long batchSize1;
-	protected long batchSize2;
-	protected long slideSize1;
-	protected long slideSize2;
-
-	protected CoBatchedDataStream(DataStream<IN1> dataStream1, DataStream<IN2> dataStream2,
-			long batchSize1, long batchSize2, long slideSize1, long slideSize2) {
-		super(dataStream1, dataStream2);
-		this.batchSize1 = batchSize1;
-		this.batchSize2 = batchSize2;
-		this.slideSize1 = slideSize1;
-		this.slideSize2 = slideSize2;
-	}
-
-	protected CoBatchedDataStream(ConnectedDataStream<IN1, IN2> coDataStream, long batchSize1,
-			long batchSize2, long slideSize1, long slideSize2) {
-		super(coDataStream);
-		this.batchSize1 = batchSize1;
-		this.batchSize2 = batchSize2;
-		this.slideSize1 = slideSize1;
-		this.slideSize2 = slideSize2;
-	}
-
-	protected CoBatchedDataStream(CoBatchedDataStream<IN1, IN2> coBatchedDataStream) {
-		super(coBatchedDataStream);
-		this.batchSize1 = coBatchedDataStream.batchSize1;
-		this.batchSize2 = coBatchedDataStream.batchSize2;
-		this.slideSize1 = coBatchedDataStream.slideSize1;
-		this.slideSize2 = coBatchedDataStream.slideSize2;
-	}
-
-	/**
-	 * Groups the elements of the {@link CoBatchedDataStream} by the given key
-	 * positions to be used with grouped operators.
-	 * 
-	 * @param keyPosition1
-	 *            The position of the field on which the first input data stream
-	 *            will be grouped.
-	 * @param keyPosition2
-	 *            The position of the field on which the second input data
-	 *            stream will be grouped.
-	 * @return The transformed {@link CoBatchedDataStream}
-	 */
-	public CoBatchedDataStream<IN1, IN2> groupBy(int keyPosition1, int keyPosition2) {
-		return new CoBatchedDataStream<IN1, IN2>(dataStream1.groupBy(keyPosition1),
-				dataStream2.groupBy(keyPosition2), batchSize1, batchSize2, slideSize1, slideSize2);
-	}
-
-	public ConnectedDataStream<IN1, IN2> groupBy(int[] keyPositions1, int[] keyPositions2) {
-		return new CoBatchedDataStream<IN1, IN2>(dataStream1.groupBy(keyPositions1),
-				dataStream2.groupBy(keyPositions2), batchSize1, batchSize2, slideSize1, slideSize2);
-	}
-
-	public ConnectedDataStream<IN1, IN2> groupBy(String field1, String field2) {
-		return new CoBatchedDataStream<IN1, IN2>(dataStream1.groupBy(field1),
-				dataStream2.groupBy(field2), batchSize1, batchSize2, slideSize1, slideSize2);
-	}
-
-	public ConnectedDataStream<IN1, IN2> groupBy(String[] fields1, String[] fields2) {
-		return new CoBatchedDataStream<IN1, IN2>(dataStream1.groupBy(fields1),
-				dataStream2.groupBy(fields2), batchSize1, batchSize2, slideSize1, slideSize2);
-	}
-
-	public ConnectedDataStream<IN1, IN2> groupBy(KeySelector<IN1, ?> keySelector1,
-			KeySelector<IN2, ?> keySelector2) {
-		return new CoBatchedDataStream<IN1, IN2>(dataStream1.groupBy(keySelector1),
-				dataStream2.groupBy(keySelector2), batchSize1, batchSize2, slideSize1, slideSize2);
-	}
-
-	@Override
-	protected <OUT> CoInvokable<IN1, IN2, OUT> getReduceInvokable(
-			CoReduceFunction<IN1, IN2, OUT> coReducer) {
-		CoBatchReduceInvokable<IN1, IN2, OUT> invokable;
-		if (isGrouped) {
-			invokable = new CoGroupedBatchReduceInvokable<IN1, IN2, OUT>(clean(coReducer), batchSize1,
-					batchSize2, slideSize1, slideSize2, keySelector1, keySelector2);
-		} else {
-			invokable = new CoBatchReduceInvokable<IN1, IN2, OUT>(clean(coReducer), batchSize1,
-					batchSize2, slideSize1, slideSize2);
-		}
-		return invokable;
-	}
-
-	protected CoBatchedDataStream<IN1, IN2> copy() {
-		return new CoBatchedDataStream<IN1, IN2>(this);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd28fcc/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java
deleted file mode 100644
index 9129f9e..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedWindowReduceInvokable;
-import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
-import org.apache.flink.streaming.api.invokable.operator.co.CoWindowReduceInvokable;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-
-/**
- * A {@link CoWindowDataStream} represents two data streams whose elements are
- * batched together into sliding windows. Operation
- * {@link #reduce(CoReduceFunction)} can be applied for each window.
- * 
- * @param <IN1>
- *            The type of the first input data stream
- * @param <IN2>
- *            The type of the second input data stream
- */
-public class CoWindowDataStream<IN1, IN2> extends CoBatchedDataStream<IN1, IN2> {
-	TimestampWrapper<IN1> timeStamp1;
-	TimestampWrapper<IN2> timeStamp2;
-
-	protected CoWindowDataStream(DataStream<IN1> dataStream1, DataStream<IN2> dataStream2,
-			long windowSize1, long windowSize2, long slideInterval1, long slideInterval2,
-			TimestampWrapper<IN1> timeStamp1, TimestampWrapper<IN2> timeStamp2) {
-		super(dataStream1, dataStream2, windowSize1, windowSize2, slideInterval1, slideInterval2);
-		this.timeStamp1 = timeStamp1;
-		this.timeStamp2 = timeStamp2;
-	}
-
-	protected CoWindowDataStream(ConnectedDataStream<IN1, IN2> coDataStream, long windowSize1,
-			long windowSize2, long slideInterval1, long slideInterval2,
-			TimestampWrapper<IN1> timeStamp1, TimestampWrapper<IN2> timeStamp2) {
-		super(coDataStream, windowSize1, windowSize2, slideInterval1, slideInterval2);
-		this.timeStamp1 = timeStamp1;
-		this.timeStamp2 = timeStamp2;
-	}
-
-	protected CoWindowDataStream(CoWindowDataStream<IN1, IN2> coWindowDataStream) {
-		super(coWindowDataStream);
-		this.timeStamp1 = coWindowDataStream.timeStamp1;
-		this.timeStamp2 = coWindowDataStream.timeStamp2;
-	}
-
-	public CoWindowDataStream<IN1, IN2> groupBy(int keyPosition1, int keyPosition2) {
-		return new CoWindowDataStream<IN1, IN2>(dataStream1.groupBy(keyPosition1),
-				dataStream2.groupBy(keyPosition2), batchSize1, batchSize2, slideSize1, slideSize2,
-				timeStamp1, timeStamp2);
-	}
-
-	public ConnectedDataStream<IN1, IN2> groupBy(int[] keyPositions1, int[] keyPositions2) {
-		return new CoWindowDataStream<IN1, IN2>(dataStream1.groupBy(keyPositions1),
-				dataStream2.groupBy(keyPositions2), batchSize1, batchSize2, slideSize1, slideSize2,
-				timeStamp1, timeStamp2);
-	}
-
-	public ConnectedDataStream<IN1, IN2> groupBy(String field1, String field2) {
-		return new CoWindowDataStream<IN1, IN2>(dataStream1.groupBy(field1),
-				dataStream2.groupBy(field2), batchSize1, batchSize2, slideSize1, slideSize2,
-				timeStamp1, timeStamp2);
-	}
-
-	public ConnectedDataStream<IN1, IN2> groupBy(String[] fields1, String[] fields2) {
-		return new CoWindowDataStream<IN1, IN2>(dataStream1.groupBy(fields1),
-				dataStream2.groupBy(fields2), batchSize1, batchSize2, slideSize1, slideSize2,
-				timeStamp1, timeStamp2);
-	}
-
-	public ConnectedDataStream<IN1, IN2> groupBy(KeySelector<IN1, ?> keySelector1,
-			KeySelector<IN2, ?> keySelector2) {
-		return new CoWindowDataStream<IN1, IN2>(dataStream1.groupBy(keySelector1),
-				dataStream2.groupBy(keySelector2), batchSize1, batchSize2, slideSize1, slideSize2,
-				timeStamp1, timeStamp2);
-	}
-
-	@Override
-	protected <OUT> CoInvokable<IN1, IN2, OUT> getReduceInvokable(
-			CoReduceFunction<IN1, IN2, OUT> coReducer) {
-		CoWindowReduceInvokable<IN1, IN2, OUT> invokable;
-		if (isGrouped) {
-			invokable = new CoGroupedWindowReduceInvokable<IN1, IN2, OUT>(clean(coReducer),
-					batchSize1, batchSize2, slideSize1, slideSize2, keySelector1, keySelector2,
-					timeStamp1, timeStamp2);
-		} else {
-			invokable = new CoWindowReduceInvokable<IN1, IN2, OUT>(clean(coReducer), batchSize1,
-					batchSize2, slideSize1, slideSize2, timeStamp1, timeStamp2);
-		}
-		return invokable;
-	}
-
-	protected CoWindowDataStream<IN1, IN2> copy() {
-		return new CoWindowDataStream<IN1, IN2>(this);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd28fcc/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index efd9531..a0c8ff8 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -227,173 +227,6 @@ public class ConnectedDataStream<IN1, IN2> {
 	}
 
 	/**
-	 * Batch operation for connected data stream. Collects each input data
-	 * stream's elements simultaneously into sliding batches creating a new
-	 * {@link CoBatchedDataStream}. Then the user can apply
-	 * {@link CoBatchedDataStream#reduce} transformation on the
-	 * {@link CoBatchedDataStream}.
-	 * 
-	 * @param batchSize1
-	 *            The number of elements in each batch of the first input data
-	 *            stream
-	 * @param batchSize2
-	 *            The number of elements in each batch of the second input data
-	 *            stream
-	 * @param slideSize1
-	 *            The number of elements with which the batches of the first
-	 *            input data stream are slid by after each transformation.
-	 * @param slideSize2
-	 *            The number of elements with which the batches of the second
-	 *            input data stream are slid by after each transformation.
-	 * @return The transformed {@link ConnectedDataStream}
-	 */
-	public CoBatchedDataStream<IN1, IN2> batch(long batchSize1, long batchSize2, long slideSize1,
-			long slideSize2) {
-		if (batchSize1 < 1 || batchSize2 < 1) {
-			throw new IllegalArgumentException("Batch size must be positive");
-		}
-		if (slideSize1 < 1 || slideSize2 < 1) {
-			throw new IllegalArgumentException("Slide size must be positive");
-		}
-		return new CoBatchedDataStream<IN1, IN2>(this, batchSize1, batchSize2, slideSize1,
-				slideSize2);
-	}
-
-	/**
-	 * Batch operation for connected data stream. Collects each input data
-	 * stream's elements simultaneously into batches creating a new
-	 * {@link CoBatchedDataStream}. Then the user can apply
-	 * {@link CoBatchedDataStream#reduce} transformation on the
-	 * {@link CoBatchedDataStream}.
-	 * 
-	 * @param batchSize1
-	 *            The number of elements in each batch of the first input data
-	 *            stream
-	 * @param batchSize2
-	 *            The number of elements in each batch of the second input data
-	 *            stream
-	 * @return The transformed {@link ConnectedDataStream}
-	 */
-	public CoBatchedDataStream<IN1, IN2> batch(long batchSize1, long batchSize2) {
-		return batch(batchSize1, batchSize2, batchSize1, batchSize2);
-	}
-
-	/**
-	 * Window operation for connected data stream. Collects each input data
-	 * stream's elements simultaneously into sliding windows creating a new
-	 * {@link CoWindowDataStream}. Then the user can apply
-	 * {@link WindowDataStream#reduce} transformation on the
-	 * {@link CoWindowDataStream}. The user can implement their own time stamps
-	 * or use the system time by default.
-	 * 
-	 * @param windowSize1
-	 *            The length of the window of the first input data stream
-	 * @param windowSize2
-	 *            The length of the window of the second input data stream
-	 * @param slideInterval1
-	 *            The number of milliseconds with which the windows of the first
-	 *            input data stream are slid by after each transformation
-	 * @param slideInterval2
-	 *            The number of milliseconds with which the windows of the
-	 *            second input data stream are slid by after each transformation
-	 * @param timeStamp1
-	 *            User defined function for extracting time-stamps from each
-	 *            element of the first input data stream
-	 * @param timeStamp2
-	 *            User defined function for extracting time-stamps from each
-	 *            element of the second input data stream
-	 * @return The transformed {@link ConnectedDataStream}
-	 */
-	public CoWindowDataStream<IN1, IN2> window(long windowSize1, long windowSize2,
-			long slideInterval1, long slideInterval2, TimestampWrapper<IN1> timeStamp1,
-			TimestampWrapper<IN2> timeStamp2) {
-		if (windowSize1 < 1 || windowSize2 < 1) {
-			throw new IllegalArgumentException("Window size must be positive");
-		}
-		if (slideInterval1 < 1 || slideInterval2 < 1) {
-			throw new IllegalArgumentException("Slide interval must be positive");
-		}
-		return new CoWindowDataStream<IN1, IN2>(this, windowSize1, windowSize2, slideInterval1,
-				slideInterval2, timeStamp1, timeStamp2);
-	}
-
-	/**
-	 * Window operation for connected data stream. Collects each input data
-	 * stream's elements simultaneously into sliding windows creating a new
-	 * {@link CoWindowDataStream}. Then the user can apply
-	 * {@link WindowDataStream#reduce} transformation on the
-	 * {@link CoWindowDataStream}.
-	 * 
-	 * @param windowSize1
-	 *            The length of the window of the first input data stream in
-	 *            milliseconds.
-	 * @param windowSize2
-	 *            The length of the window of the second input data stream in
-	 *            milliseconds.
-	 * @param slideInterval1
-	 *            The number of milliseconds with which the windows of the first
-	 *            input data stream are slid by after each transformation
-	 * @param slideInterval2
-	 *            The number of milliseconds with which the windows of the
-	 *            second input data stream are slid by after each transformation
-	 * @return The transformed {@link ConnectedDataStream}
-	 */
-	@SuppressWarnings("unchecked")
-	public CoWindowDataStream<IN1, IN2> window(long windowSize1, long windowSize2,
-			long slideInterval1, long slideInterval2) {
-		return window(windowSize1, windowSize2, slideInterval1, slideInterval2,
-				(TimestampWrapper<IN1>) SystemTimestamp.getWrapper(),
-				(TimestampWrapper<IN2>) SystemTimestamp.getWrapper());
-	}
-
-	/**
-	 * Window operation for connected data stream. Collects each input data
-	 * stream's elements simultaneously into windows creating a new
-	 * {@link CoWindowDataStream}. Then the user can apply
-	 * {@link WindowDataStream#reduce} transformation on the
-	 * {@link CoWindowDataStream}. The user can implement their own time stamps
-	 * or use the system time by default.
-	 * 
-	 * @param windowSize1
-	 *            The length of the window of the first input data stream
-	 * @param windowSize2
-	 *            The length of the window of the second input data stream
-	 * @param timeStamp1
-	 *            User defined function for extracting time-stamps from each
-	 *            element of the first input data stream
-	 * @param timeStamp2
-	 *            User defined function for extracting time-stamps from each
-	 *            element of the second input data stream
-	 * @return The transformed {@link ConnectedDataStream}
-	 */
-	public CoWindowDataStream<IN1, IN2> window(long windowSize1, long windowSize2,
-			TimestampWrapper<IN1> timeStamp1, TimestampWrapper<IN2> timeStamp2) {
-		return window(windowSize1, windowSize2, windowSize1, windowSize2, timeStamp1, timeStamp2);
-	}
-
-	/**
-	 * Window operation for connected data stream. Collects each input data
-	 * stream's elements simultaneously into windows creating a new
-	 * {@link CoWindowDataStream}. Then the user can apply
-	 * {@link WindowDataStream#reduce} transformation on the
-	 * {@link CoWindowDataStream}.
-	 * 
-	 * @param windowSize1
-	 *            The length of the window of the first input data stream in
-	 *            milliseconds
-	 * @param windowSize2
-	 *            The length of the window of the second input data stream in
-	 *            milliseconds
-	 * @return The transformed {@link ConnectedDataStream}
-	 */
-	@SuppressWarnings("unchecked")
-	public CoWindowDataStream<IN1, IN2> window(long windowSize1, long windowSize2) {
-		return window(windowSize1, windowSize2, windowSize1, windowSize2,
-				(TimestampWrapper<IN1>) SystemTimestamp.getWrapper(),
-				(TimestampWrapper<IN2>) SystemTimestamp.getWrapper());
-	}
-
-	/**
 	 * Applies a CoMap transformation on a {@link ConnectedDataStream} and maps
 	 * the output to a common type. The transformation calls a
 	 * {@link CoMapFunction#map1} for each element of the first input and

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd28fcc/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchReduceInvokable.java
deleted file mode 100644
index 4ed49fd..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchReduceInvokable.java
+++ /dev/null
@@ -1,316 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.co;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.commons.math.util.MathUtils;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.state.NullableCircularBuffer;
-
-public class CoBatchReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
-
-	private static final long serialVersionUID = 1L;
-	protected CoReduceFunction<IN1, IN2, OUT> coReducer;
-
-	protected long slideSize1;
-	protected long slideSize2;
-	protected long batchSize1;
-	protected long batchSize2;
-	protected int granularity1;
-	protected int granularity2;
-	protected long batchPerSlide1;
-	protected long batchPerSlide2;
-	protected long numberOfBatches1;
-	protected long numberOfBatches2;
-	protected StreamBatch<IN1> batch1;
-	protected StreamBatch<IN2> batch2;
-	protected StreamBatch<IN1> currentBatch1;
-	protected StreamBatch<IN2> currentBatch2;
-
-	public CoBatchReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer, long batchSize1,
-			long batchSize2, long slideSize1, long slideSize2) {
-		super(coReducer);
-		this.coReducer = coReducer;
-		this.batchSize1 = batchSize1;
-		this.batchSize2 = batchSize2;
-		this.slideSize1 = slideSize1;
-		this.slideSize2 = slideSize2;
-		this.granularity1 = (int) MathUtils.gcd(batchSize1, slideSize1);
-		this.granularity2 = (int) MathUtils.gcd(batchSize2, slideSize2);
-		this.batchPerSlide1 = slideSize1 / granularity1;
-		this.batchPerSlide2 = slideSize2 / granularity2;
-		this.numberOfBatches1 = batchSize1 / granularity1;
-		this.numberOfBatches2 = batchSize2 / granularity2;
-	}
-
-	@Override
-	public void invoke() throws Exception {
-		while (true) {
-			int next = recordIterator.next(reuse1, reuse2);
-			if (next == 0) {
-				reduceLastBatch1();
-				reduceLastBatch2();
-				break;
-			} else if (next == 1) {
-				handleStream1();
-				resetReuse1();
-			} else {
-				handleStream2();
-				resetReuse2();
-			}
-		}
-	}
-
-	@Override
-	protected void handleStream1() throws Exception {
-		StreamBatch<IN1> batch1 = getBatch1(reuse1);
-		reduceToBuffer1(reuse1.getObject(), batch1);
-	}
-
-	@Override
-	protected void handleStream2() throws Exception {
-		StreamBatch<IN2> batch2 = getBatch2(reuse2);
-		reduceToBuffer2(reuse2.getObject(), batch2);
-	}
-
-	protected StreamBatch<IN1> getBatch1(StreamRecord<IN1> next) {
-		return batch1;
-	}
-
-	protected StreamBatch<IN2> getBatch2(StreamRecord<IN2> next) {
-		return batch2;
-	}
-
-	protected void reduce1(StreamBatch<IN1> batch) {
-		this.currentBatch1 = batch;
-		callUserFunctionAndLogException1();
-	}
-
-	protected void reduce2(StreamBatch<IN2> batch) {
-		this.currentBatch2 = batch;
-		callUserFunctionAndLogException2();
-	}
-
-	protected void reduceLastBatch1() throws Exception {
-		reduceLastBatch1(batch1);
-	}
-
-	protected void reduceLastBatch2() throws Exception {
-		reduceLastBatch2(batch2);
-	}
-
-	@Override
-	protected void callUserFunction1() throws Exception {
-		Iterator<IN1> reducedIterator = currentBatch1.getIterator();
-		IN1 reduced = null;
-
-		while (reducedIterator.hasNext() && reduced == null) {
-			reduced = reducedIterator.next();
-		}
-
-		while (reducedIterator.hasNext()) {
-			IN1 next = reducedIterator.next();
-			if (next != null) {
-				reduced = coReducer.reduce1(serializer1.copy(reduced), serializer1.copy(next));
-			}
-		}
-		if (reduced != null) {
-			collector.collect(coReducer.map1(serializer1.copy(reduced)));
-		}
-	}
-
-	@Override
-	protected void callUserFunction2() throws Exception {
-		Iterator<IN2> reducedIterator = currentBatch2.getIterator();
-		IN2 reduced = null;
-
-		while (reducedIterator.hasNext() && reduced == null) {
-			reduced = reducedIterator.next();
-		}
-
-		while (reducedIterator.hasNext()) {
-			IN2 next = reducedIterator.next();
-			if (next != null) {
-				reduced = coReducer.reduce2(serializer2.copy(reduced), serializer2.copy(next));
-			}
-		}
-		if (reduced != null) {
-			collector.collect(coReducer.map2(serializer2.copy(reduced)));
-		}
-	}
-
-	@Override
-	public void open(Configuration config) throws Exception {
-		super.open(config);
-		this.batch1 = new StreamBatch<IN1>(batchSize1, slideSize1);
-		this.batch2 = new StreamBatch<IN2>(batchSize2, slideSize2);
-	}
-
-	public void reduceToBuffer1(IN1 nextValue, StreamBatch<IN1> streamBatch) throws Exception {
-
-		if (streamBatch.currentValue != null) {
-			streamBatch.currentValue = coReducer.reduce1(
-					serializer1.copy(streamBatch.currentValue), serializer1.copy(nextValue));
-		} else {
-			streamBatch.currentValue = nextValue;
-		}
-
-		streamBatch.counter++;
-
-		if (streamBatch.miniBatchEnd()) {
-			streamBatch.addToBuffer();
-			if (streamBatch.batchEnd()) {
-				reduceBatch1(streamBatch);
-			}
-		}
-	}
-
-	public void reduceToBuffer2(IN2 nextValue, StreamBatch<IN2> streamBatch) throws Exception {
-
-		if (streamBatch.currentValue != null) {
-			streamBatch.currentValue = coReducer.reduce2(
-					serializer2.copy(streamBatch.currentValue), serializer2.copy(nextValue));
-		} else {
-			streamBatch.currentValue = nextValue;
-		}
-
-		streamBatch.counter++;
-
-		if (streamBatch.miniBatchEnd()) {
-			streamBatch.addToBuffer();
-			if (streamBatch.batchEnd()) {
-				reduceBatch2(streamBatch);
-			}
-		}
-	}
-
-	public void reduceLastBatch1(StreamBatch<IN1> streamBatch) throws Exception {
-		if (streamBatch.miniBatchInProgress()) {
-			streamBatch.addToBuffer();
-		}
-
-		if (streamBatch.changed == true && streamBatch.minibatchCounter >= 0) {
-			if (streamBatch.circularBuffer.isFull()) {
-				for (long i = 0; i < (numberOfBatches1 - streamBatch.minibatchCounter); i++) {
-					if (!streamBatch.circularBuffer.isEmpty()) {
-						streamBatch.circularBuffer.remove();
-					}
-				}
-			}
-			if (!streamBatch.circularBuffer.isEmpty()) {
-				reduce1(streamBatch);
-			}
-		}
-
-	}
-
-	public void reduceLastBatch2(StreamBatch<IN2> streamBatch) throws Exception {
-		if (streamBatch.miniBatchInProgress()) {
-			streamBatch.addToBuffer();
-		}
-
-		if (streamBatch.changed == true && streamBatch.minibatchCounter >= 0) {
-			if (streamBatch.circularBuffer.isFull()) {
-				for (long i = 0; i < (numberOfBatches2 - streamBatch.minibatchCounter); i++) {
-					if (!streamBatch.circularBuffer.isEmpty()) {
-						streamBatch.circularBuffer.remove();
-					}
-				}
-			}
-			if (!streamBatch.circularBuffer.isEmpty()) {
-				reduce2(streamBatch);
-			}
-		}
-
-	}
-
-	public void reduceBatch1(StreamBatch<IN1> streamBatch) {
-		reduce1(streamBatch);
-		streamBatch.changed = false;
-	}
-
-	public void reduceBatch2(StreamBatch<IN2> streamBatch) {
-		reduce2(streamBatch);
-		streamBatch.changed = false;
-	}
-
-	protected class StreamBatch<IN> implements Serializable {
-		private static final long serialVersionUID = 1L;
-
-		protected long counter;
-		protected long minibatchCounter;
-		protected IN currentValue;
-		protected long batchSize;
-		protected long slideSize;
-		protected long granularity;
-		protected long batchPerSlide;
-		protected long numberOfBatches;
-		boolean changed;
-
-		protected NullableCircularBuffer circularBuffer;
-
-		public StreamBatch(long batchSize, long slideSize) {
-			this.batchSize = batchSize;
-			this.slideSize = slideSize;
-			this.granularity = (int) MathUtils.gcd(batchSize, slideSize);
-			this.batchPerSlide = slideSize / granularity;
-			this.circularBuffer = new NullableCircularBuffer((int) (batchSize / granularity));
-			this.counter = 0;
-			this.minibatchCounter = 0;
-			this.currentValue = null;
-			this.numberOfBatches = batchSize / granularity;
-			this.changed = false;
-
-		}
-
-		protected void addToBuffer() {
-			circularBuffer.add(currentValue);
-			changed = true;
-			minibatchCounter++;
-			currentValue = null;
-		}
-
-		protected boolean miniBatchEnd() {
-			return (counter % granularity) == 0;
-		}
-
-		public boolean batchEnd() {
-			if (counter == batchSize) {
-				counter -= slideSize;
-				minibatchCounter -= batchPerSlide;
-				return true;
-			}
-			return false;
-		}
-
-		public boolean miniBatchInProgress() {
-			return currentValue != null;
-		}
-
-		@SuppressWarnings("unchecked")
-		public Iterator<IN> getIterator() {
-			return circularBuffer.iterator();
-		}
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd28fcc/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedBatchReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedBatchReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedBatchReduceInvokable.java
deleted file mode 100644
index 745f507..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedBatchReduceInvokable.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.co;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-
-public class CoGroupedBatchReduceInvokable<IN1, IN2, OUT> extends
-		CoBatchReduceInvokable<IN1, IN2, OUT> {
-	private static final long serialVersionUID = 1L;
-	protected KeySelector<IN1, ?> keySelector1;
-	protected KeySelector<IN2, ?> keySelector2;
-	Map<Object, StreamBatch<IN1>> streamBatches1;
-	Map<Object, StreamBatch<IN2>> streamBatches2;
-
-	public CoGroupedBatchReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer,
-			long batchSize1, long batchSize2, long slideSize1, long slideSize2,
-			KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2) {
-		super(coReducer, batchSize1, batchSize2, slideSize1, slideSize2);
-		this.keySelector1 = keySelector1;
-		this.keySelector2 = keySelector2;
-		this.streamBatches1 = new HashMap<Object, StreamBatch<IN1>>();
-		this.streamBatches2 = new HashMap<Object, StreamBatch<IN2>>();
-	}
-
-	protected void reduceLastBatch1() throws Exception {
-		for (StreamBatch<IN1> batch : streamBatches1.values()) {
-			reduceLastBatch1(batch);
-		}
-	}
-
-	protected void reduceLastBatch2() throws Exception {
-		for (StreamBatch<IN2> batch : streamBatches2.values()) {
-			reduceLastBatch2(batch);
-		}
-	}
-
-	@Override
-	protected StreamBatch<IN1> getBatch1(StreamRecord<IN1> next) {
-		Object key = next.getKey(keySelector1);
-		StreamBatch<IN1> batch = streamBatches1.get(key);
-		if (batch == null) {
-			batch = new StreamBatch<IN1>(batchSize1, slideSize1);
-			streamBatches1.put(key, batch);
-		}
-		return batch;
-	}
-
-	@Override
-	protected StreamBatch<IN2> getBatch2(StreamRecord<IN2> next) {
-		Object key = next.getKey(keySelector2);
-		StreamBatch<IN2> batch = streamBatches2.get(key);
-		if (batch == null) {
-			batch = new StreamBatch<IN2>(batchSize2, slideSize2);
-			streamBatches2.put(key, batch);
-		}
-		return batch;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd28fcc/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowReduceInvokable.java
deleted file mode 100644
index 736239f..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowReduceInvokable.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.co;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-
-public class CoGroupedWindowReduceInvokable<IN1, IN2, OUT> extends
-		CoWindowReduceInvokable<IN1, IN2, OUT> {
-	private static final long serialVersionUID = 1L;
-	protected KeySelector<IN1, ?> keySelector1;
-	protected KeySelector<IN2, ?> keySelector2;
-	private Map<Object, StreamWindow<IN1>> streamWindows1;
-	private Map<Object, StreamWindow<IN2>> streamWindows2;
-	private long currentMiniBatchCount1 = 0;
-	private long currentMiniBatchCount2 = 0;
-
-	public CoGroupedWindowReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer,
-			long windowSize1, long windowSize2, long slideInterval1, long slideInterval2,
-			KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2,
-			TimestampWrapper<IN1> timestamp1, TimestampWrapper<IN2> timestamp2) {
-		super(coReducer, windowSize1, windowSize2, slideInterval1, slideInterval2, timestamp1,
-				timestamp2);
-		this.keySelector1 = keySelector1;
-		this.keySelector2 = keySelector2;
-		this.streamWindows1 = new HashMap<Object, StreamWindow<IN1>>();
-		this.streamWindows2 = new HashMap<Object, StreamWindow<IN2>>();
-	}
-
-	@Override
-	protected StreamBatch<IN1> getBatch1(StreamRecord<IN1> next) {
-		Object key = next.getKey(keySelector1);
-		StreamWindow<IN1> window = streamWindows1.get(key);
-		if (window == null) {
-			window = new GroupedStreamWindow<IN1>(batchSize1, slideSize1);
-			window.minibatchCounter = currentMiniBatchCount1;
-			streamWindows1.put(key, window);
-		}
-		this.window1 = window;
-		return window;
-	}
-
-	@Override
-	protected StreamBatch<IN2> getBatch2(StreamRecord<IN2> next) {
-		Object key = next.getKey(keySelector2);
-		StreamWindow<IN2> window = streamWindows2.get(key);
-		if (window == null) {
-			window = new GroupedStreamWindow<IN2>(batchSize2, slideSize2);
-			window.minibatchCounter = currentMiniBatchCount2;
-			streamWindows2.put(key, window);
-		}
-		this.window2 = window;
-		return window;
-	}
-
-	private void addToAllBuffers1() {
-		for (StreamBatch<IN1> window : streamWindows1.values()) {
-			window.addToBuffer();
-		}
-	}
-
-	private void addToAllBuffers2() {
-		for (StreamBatch<IN2> window : streamWindows2.values()) {
-			window.addToBuffer();
-		}
-	}
-
-	private void reduceAllWindows1() {
-		for (StreamBatch<IN1> window : streamWindows1.values()) {
-			window.minibatchCounter -= batchPerSlide1;
-			reduceBatch1((StreamBatch<IN1>) window);
-		}
-	}
-
-	private void reduceAllWindows2() {
-		for (StreamBatch<IN2> window : streamWindows2.values()) {
-			window.minibatchCounter -= batchPerSlide2;
-			reduceBatch2((StreamBatch<IN2>) window);
-		}
-	}
-
-	@Override
-	protected void reduceLastBatch1() throws Exception {
-		for (StreamBatch<IN1> window : streamWindows1.values()) {
-			reduceLastBatch1((StreamBatch<IN1>) window);
-		}
-	}
-
-	@Override
-	protected void reduceLastBatch2() throws Exception {
-		for (StreamBatch<IN2> window : streamWindows2.values()) {
-			reduceLastBatch2((StreamBatch<IN2>) window);
-		}
-	}
-
-	@Override
-	protected synchronized void checkWindowEnd1(long timeStamp, StreamWindow<IN1> streamWindow) {
-		nextRecordTime1 = timeStamp;
-
-		while (miniBatchEnd1()) {
-			addToAllBuffers1();
-			if (streamWindow.batchEnd()) {
-				reduceAllWindows1();
-			}
-		}
-		currentMiniBatchCount1 = streamWindow.minibatchCounter;
-	}
-
-	@Override
-	protected synchronized void checkWindowEnd2(long timeStamp, StreamWindow<IN2> streamWindow) {
-		nextRecordTime2 = timeStamp;
-
-		while (miniBatchEnd2()) {
-			addToAllBuffers2();
-			if (streamWindow.batchEnd()) {
-				reduceAllWindows2();
-			}
-		}
-		currentMiniBatchCount2 = streamWindow.minibatchCounter;
-	}
-
-	protected class GroupedStreamWindow<IN> extends StreamWindow<IN> {
-		private static final long serialVersionUID = 1L;
-
-		public GroupedStreamWindow(long windowSize, long slideInterval) {
-			super(windowSize, slideInterval);
-		}
-
-		@Override
-		public boolean batchEnd() {
-			if (minibatchCounter == numberOfBatches) {
-				return true;
-			}
-			return false;
-		}
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd28fcc/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowReduceInvokable.java
deleted file mode 100644
index 0c8598f..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowReduceInvokable.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.co;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-
-public class CoWindowReduceInvokable<IN1, IN2, OUT> extends CoBatchReduceInvokable<IN1, IN2, OUT> {
-	private static final long serialVersionUID = 1L;
-	protected long startTime1;
-	protected long startTime2;
-	protected long nextRecordTime1;
-	protected long nextRecordTime2;
-	protected TimestampWrapper<IN1> timestamp1;
-	protected TimestampWrapper<IN2> timestamp2;
-	protected StreamWindow<IN1> window1;
-	protected StreamWindow<IN2> window2;
-
-	public CoWindowReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer, long windowSize1,
-			long windowSize2, long slideInterval1, long slideInterval2,
-			TimestampWrapper<IN1> timestamp1, TimestampWrapper<IN2> timestamp2) {
-		super(coReducer, windowSize1, windowSize2, slideInterval1, slideInterval2);
-		this.timestamp1 = timestamp1;
-		this.timestamp2 = timestamp2;
-		this.startTime1 = timestamp1.getStartTime();
-		this.startTime2 = timestamp2.getStartTime();
-
-	}
-
-	@Override
-	public void open(Configuration config) throws Exception {
-		super.open(config);
-		this.window1 = new StreamWindow<IN1>(batchSize1, slideSize1);
-		this.window2 = new StreamWindow<IN2>(batchSize2, slideSize2);
-		this.batch1 = this.window1;
-		this.batch2 = this.window2;
-		if (timestamp1.isDefaultTimestamp()) {
-			(new TimeCheck1()).start();
-		}
-		if (timestamp2.isDefaultTimestamp()) {
-			(new TimeCheck2()).start();
-		}
-	}
-
-	@Override
-	public void reduceToBuffer1(IN1 nextValue, StreamBatch<IN1> streamWindow) throws Exception {
-
-		checkWindowEnd1(timestamp1.getTimestamp(nextValue), (StreamWindow<IN1>) streamWindow);
-
-		if (streamWindow.currentValue != null) {
-			streamWindow.currentValue = coReducer.reduce1(
-					serializer1.copy(streamWindow.currentValue), serializer1.copy(nextValue));
-		} else {
-			streamWindow.currentValue = nextValue;
-		}
-	}
-
-	@Override
-	public void reduceToBuffer2(IN2 nextValue, StreamBatch<IN2> streamWindow) throws Exception {
-
-		checkWindowEnd2(timestamp2.getTimestamp(nextValue), (StreamWindow<IN2>) streamWindow);
-
-		if (streamWindow.currentValue != null) {
-			streamWindow.currentValue = coReducer.reduce2(
-					serializer2.copy(streamWindow.currentValue), serializer2.copy(nextValue));
-		} else {
-			streamWindow.currentValue = nextValue;
-		}
-	}
-
-	protected synchronized void checkWindowEnd1(long timeStamp, StreamWindow<IN1> streamWindow) {
-		nextRecordTime1 = timeStamp;
-
-		while (miniBatchEnd1()) {
-			streamWindow.addToBuffer();
-			if (streamWindow.batchEnd()) {
-				reduceBatch1(streamWindow);
-			}
-		}
-	}
-
-	protected synchronized void checkWindowEnd2(long timeStamp, StreamWindow<IN2> streamWindow) {
-		nextRecordTime2 = timeStamp;
-
-		while (miniBatchEnd2()) {
-			streamWindow.addToBuffer();
-			if (streamWindow.batchEnd()) {
-				reduceBatch2(streamWindow);
-			}
-		}
-	}
-
-	protected boolean miniBatchEnd1() {
-		if (nextRecordTime1 < startTime1 + granularity1) {
-			return false;
-		} else {
-			startTime1 += granularity1;
-			return true;
-		}
-	}
-
-	protected boolean miniBatchEnd2() {
-		if (nextRecordTime2 < startTime2 + granularity2) {
-			return false;
-		} else {
-			startTime2 += granularity2;
-			return true;
-		}
-	}
-
-	@Override
-	public void reduceBatch1(StreamBatch<IN1> streamBatch) {
-		reduce1(streamBatch);
-	}
-
-	@Override
-	public void reduceBatch2(StreamBatch<IN2> streamBatch) {
-		reduce2(streamBatch);
-	}
-
-	protected class StreamWindow<IN> extends StreamBatch<IN> {
-		private static final long serialVersionUID = 1L;
-
-		public StreamWindow(long windowSize, long slideInterval) {
-			super(windowSize, slideInterval);
-		}
-
-		@Override
-		public boolean batchEnd() {
-			if (minibatchCounter == numberOfBatches) {
-				minibatchCounter -= batchPerSlide;
-				return true;
-			}
-			return false;
-		}
-
-	}
-
-	private class TimeCheck1 extends Thread {
-		@Override
-		public void run() {
-			while (true) {
-				try {
-					Thread.sleep(slideSize1);
-				} catch (InterruptedException e) {
-				}
-				if (isRunning) {
-					checkWindowEnd1(System.currentTimeMillis(), window1);
-				} else {
-					break;
-				}
-			}
-		}
-	}
-
-	private class TimeCheck2 extends Thread {
-		@Override
-		public void run() {
-			while (true) {
-				try {
-					Thread.sleep(slideSize2);
-				} catch (InterruptedException e) {
-				}
-				if (isRunning) {
-					checkWindowEnd2(System.currentTimeMillis(), window2);
-				} else {
-					break;
-				}
-			}
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd28fcc/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java
deleted file mode 100644
index 1db286c..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import org.apache.flink.streaming.api.invokable.operator.co.CoBatchReduceInvokable;
-import org.apache.flink.streaming.util.MockCoContext;
-import org.junit.Test;
-
-public class CoBatchReduceTest {
-
-	private static class MyCoReduceFunction implements CoReduceFunction<Integer, String, String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Integer reduce1(Integer value1, Integer value2) {
-			return value1 + value2;
-		}
-
-		@Override
-		public String reduce2(String value1, String value2) {
-			return value1 + value2;
-		}
-
-		@Override
-		public String map1(Integer value) {
-			return value.toString();
-		}
-
-		@Override
-		public String map2(String value) {
-			return value;
-		}
-	}
-
-	@Test
-	public void coBatchReduceTest1() {
-
-		List<Integer> inputs = new ArrayList<Integer>();
-		for (Integer i = 1; i <= 10; i++) {
-			inputs.add(i);
-		}
-
-		List<String> inputs2 = new ArrayList<String>();
-		inputs2.add("a");
-		inputs2.add("b");
-		inputs2.add("c");
-		inputs2.add("d");
-		inputs2.add("e");
-		inputs2.add("f");
-		inputs2.add("g");
-		inputs2.add("h");
-		inputs2.add("i");
-
-		CoBatchReduceInvokable<Integer, String, String> invokable = new CoBatchReduceInvokable<Integer, String, String>(
-				new MyCoReduceFunction(), 4L, 3L, 4L, 3L);
-
-		List<String> expected = new ArrayList<String>();
-		expected.add("10");
-		expected.add("26");
-		expected.add("19");
-		expected.add("abc");
-		expected.add("def");
-		expected.add("ghi");
-
-		List<String> result = MockCoContext.createAndExecute(invokable, inputs, inputs2);
-
-		Collections.sort(result);
-		Collections.sort(expected);
-
-		assertEquals(expected, result);
-
-	}
-
-	@Test
-	public void coBatchReduceTest2() {
-
-		List<Integer> inputs = new ArrayList<Integer>();
-		for (Integer i = 1; i <= 10; i++) {
-			inputs.add(i);
-		}
-
-		List<String> inputs2 = new ArrayList<String>();
-		inputs2.add("a");
-		inputs2.add("b");
-		inputs2.add("c");
-		inputs2.add("d");
-		inputs2.add("e");
-		inputs2.add("f");
-		inputs2.add("g");
-		inputs2.add("h");
-		inputs2.add("i");
-
-		CoBatchReduceInvokable<Integer, String, String> invokable = new CoBatchReduceInvokable<Integer, String, String>(
-				new MyCoReduceFunction(), 4L, 3L, 2L, 2L);
-
-		List<String> expected = new ArrayList<String>();
-		expected.add("10");
-		expected.add("18");
-		expected.add("26");
-		expected.add("34");
-		expected.add("abc");
-		expected.add("cde");
-		expected.add("efg");
-		expected.add("ghi");
-
-		List<String> result = MockCoContext.createAndExecute(invokable, inputs, inputs2);
-
-		Collections.sort(result);
-		Collections.sort(expected);
-
-		assertEquals(expected, result);
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd28fcc/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java
deleted file mode 100644
index 1d0732c..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedBatchReduceInvokable;
-import org.apache.flink.streaming.util.MockCoContext;
-import org.junit.Test;
-
-public class CoGroupedBatchReduceTest {
-
-	KeySelector<Tuple2<String, String>, ?> keySelector1 = new KeySelector<Tuple2<String, String>, String>() {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public String getKey(Tuple2<String, String> value) throws Exception {
-			return value.f0;
-		}
-	};
-
-	KeySelector<Tuple2<String, Integer>, ?> keySelector2 = new KeySelector<Tuple2<String, Integer>, String>() {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public String getKey(Tuple2<String, Integer> value) throws Exception {
-			return value.f0;
-		}
-	};
-
-	private static class MyCoReduceFunction implements
-			CoReduceFunction<Tuple2<String, Integer>, Tuple2<String, String>, String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Tuple2<String, Integer> reduce1(Tuple2<String, Integer> value1,
-				Tuple2<String, Integer> value2) {
-			return new Tuple2<String, Integer>("a", value1.f1 + value2.f1);
-		}
-
-		@Override
-		public Tuple2<String, String> reduce2(Tuple2<String, String> value1,
-				Tuple2<String, String> value2) {
-			return new Tuple2<String, String>("a", value1.f1 + value2.f1);
-		}
-
-		@Override
-		public String map1(Tuple2<String, Integer> value) {
-			return value.f1.toString();
-		}
-
-		@Override
-		public String map2(Tuple2<String, String> value) {
-			return value.f1;
-		}
-	}
-
-	@Test
-	public void coGroupedBatchReduceTest1() {
-
-		List<Tuple2<String, Integer>> inputs1 = new ArrayList<Tuple2<String, Integer>>();
-		inputs1.add(new Tuple2<String, Integer>("a", 1));
-		inputs1.add(new Tuple2<String, Integer>("a", 2));
-		inputs1.add(new Tuple2<String, Integer>("a", 3));
-		inputs1.add(new Tuple2<String, Integer>("a", 4));
-		inputs1.add(new Tuple2<String, Integer>("a", 5));
-		inputs1.add(new Tuple2<String, Integer>("b", 6));
-		inputs1.add(new Tuple2<String, Integer>("a", 7));
-		inputs1.add(new Tuple2<String, Integer>("b", 8));
-		inputs1.add(new Tuple2<String, Integer>("b", 9));
-		inputs1.add(new Tuple2<String, Integer>("b", 10));
-
-		List<Tuple2<String, String>> inputs2 = new ArrayList<Tuple2<String, String>>();
-		inputs2.add(new Tuple2<String, String>("1", "a"));
-		inputs2.add(new Tuple2<String, String>("2", "b"));
-		inputs2.add(new Tuple2<String, String>("1", "c"));
-		inputs2.add(new Tuple2<String, String>("2", "d"));
-		inputs2.add(new Tuple2<String, String>("1", "e"));
-		inputs2.add(new Tuple2<String, String>("2", "f"));
-		inputs2.add(new Tuple2<String, String>("1", "g"));
-		inputs2.add(new Tuple2<String, String>("2", "h"));
-		inputs2.add(new Tuple2<String, String>("1", "i"));
-
-		List<String> expected = new ArrayList<String>();
-		expected.add("10");
-		expected.add("12");
-		expected.add("33");
-		expected.add("ace");
-		expected.add("gi");
-		expected.add("bdf");
-		expected.add("h");
-
-		CoGroupedBatchReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>, String> invokable = new CoGroupedBatchReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>, String>(
-				new MyCoReduceFunction(), 4L, 3L, 4L, 3L, keySelector2, keySelector1);
-
-		List<String> result = MockCoContext.createAndExecute(invokable, inputs1, inputs2);
-
-		Collections.sort(result);
-		Collections.sort(expected);
-		assertEquals(expected, result);
-	}
-
-	@Test
-	public void coGroupedBatchReduceTest2() {
-
-		List<Tuple2<String, Integer>> inputs1 = new ArrayList<Tuple2<String, Integer>>();
-		inputs1.add(new Tuple2<String, Integer>("a", 1));
-		inputs1.add(new Tuple2<String, Integer>("a", 2));
-		inputs1.add(new Tuple2<String, Integer>("a", 3));
-		inputs1.add(new Tuple2<String, Integer>("a", 4));
-		inputs1.add(new Tuple2<String, Integer>("a", 5));
-		inputs1.add(new Tuple2<String, Integer>("b", 6));
-		inputs1.add(new Tuple2<String, Integer>("a", 7));
-		inputs1.add(new Tuple2<String, Integer>("b", 8));
-		inputs1.add(new Tuple2<String, Integer>("b", 9));
-		inputs1.add(new Tuple2<String, Integer>("b", 10));
-
-		List<Tuple2<String, String>> inputs2 = new ArrayList<Tuple2<String, String>>();
-		inputs2.add(new Tuple2<String, String>("1", "a"));
-		inputs2.add(new Tuple2<String, String>("2", "b"));
-		inputs2.add(new Tuple2<String, String>("1", "c"));
-		inputs2.add(new Tuple2<String, String>("2", "d"));
-		inputs2.add(new Tuple2<String, String>("1", "e"));
-		inputs2.add(new Tuple2<String, String>("2", "f"));
-		inputs2.add(new Tuple2<String, String>("1", "g"));
-		inputs2.add(new Tuple2<String, String>("2", "h"));
-		inputs2.add(new Tuple2<String, String>("1", "i"));
-
-		List<String> expected = new ArrayList<String>();
-		expected.add("10");
-		expected.add("19");
-		expected.add("33");
-		expected.add("ace");
-		expected.add("egi");
-		expected.add("bdf");
-		expected.add("fh");
-
-		CoGroupedBatchReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>, String> invokable = new CoGroupedBatchReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>, String>(
-				new MyCoReduceFunction(), 4L, 3L, 2L, 2L, keySelector2, keySelector1);
-
-		List<String> result = MockCoContext.createAndExecute(invokable, inputs1, inputs2);
-
-		Collections.sort(result);
-		Collections.sort(expected);
-		assertEquals(expected, result);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd28fcc/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
deleted file mode 100644
index 508366c..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedWindowReduceInvokable;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.apache.flink.streaming.util.MockCoContext;
-import org.junit.Test;
-
-public class CoGroupedWindowReduceTest {
-
-	KeySelector<Tuple2<String, Integer>, ?> keySelector0 = new KeySelector<Tuple2<String, Integer>, String>() {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public String getKey(Tuple2<String, Integer> value) throws Exception {
-			return value.f0;
-		}
-	};
-
-	KeySelector<Tuple2<String, String>, ?> keySelector1 = new KeySelector<Tuple2<String, String>, String>() {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public String getKey(Tuple2<String, String> value) throws Exception {
-			return value.f0;
-		}
-	};
-
-	private static class MyCoReduceFunction implements
-			CoReduceFunction<Tuple2<String, Integer>, Tuple2<String, String>, String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Tuple2<String, Integer> reduce1(Tuple2<String, Integer> value1,
-				Tuple2<String, Integer> value2) {
-			return new Tuple2<String, Integer>("a", value1.f1 + value2.f1);
-		}
-
-		@Override
-		public Tuple2<String, String> reduce2(Tuple2<String, String> value1,
-				Tuple2<String, String> value2) {
-			return new Tuple2<String, String>("a", value1.f1 + value2.f1);
-		}
-
-		@Override
-		public String map1(Tuple2<String, Integer> value) {
-			return value.f1.toString();
-		}
-
-		@Override
-		public String map2(Tuple2<String, String> value) {
-			return value.f1;
-		}
-	}
-
-	public static final class MyTimeStamp<T> extends TimestampWrapper<T> {
-		private static final long serialVersionUID = 1L;
-
-		private Iterator<Long> timestamps;
-		private long start;
-
-		public MyTimeStamp(List<Long> timestamps) {
-			super(null, 0);
-			this.timestamps = timestamps.iterator();
-			this.start = timestamps.get(0);
-		}
-
-		@Override
-		public long getTimestamp(T value) {
-			long ts = timestamps.next();
-			return ts;
-		}
-
-		@Override
-		public long getStartTime() {
-			return start;
-		}
-	}
-
-	@Test
-	public void coGroupedWindowReduceTest1() {
-
-		List<Long> timestamps1 = Arrays.asList(0L, 0L, 1L, 1L, 1L, 1L, 2L, 4L, 5L, 6L);
-		List<Tuple2<String, Integer>> inputs1 = new ArrayList<Tuple2<String, Integer>>();
-		inputs1.add(new Tuple2<String, Integer>("a", 1));
-		inputs1.add(new Tuple2<String, Integer>("a", 2));
-		inputs1.add(new Tuple2<String, Integer>("a", 3));
-		inputs1.add(new Tuple2<String, Integer>("a", 4));
-		inputs1.add(new Tuple2<String, Integer>("a", 5));
-		inputs1.add(new Tuple2<String, Integer>("b", 6));
-		inputs1.add(new Tuple2<String, Integer>("a", 7));
-		inputs1.add(new Tuple2<String, Integer>("b", 8));
-		inputs1.add(new Tuple2<String, Integer>("b", 9));
-		inputs1.add(new Tuple2<String, Integer>("b", 10));
-
-		List<Long> timestamps2 = Arrays.asList(1L, 1L, 2L, 2L, 3L, 5L, 5L, 6L, 7L);
-		List<Tuple2<String, String>> inputs2 = new ArrayList<Tuple2<String, String>>();
-		inputs2.add(new Tuple2<String, String>("1", "a"));
-		inputs2.add(new Tuple2<String, String>("2", "b"));
-		inputs2.add(new Tuple2<String, String>("1", "c"));
-		inputs2.add(new Tuple2<String, String>("2", "d"));
-		inputs2.add(new Tuple2<String, String>("1", "e"));
-		inputs2.add(new Tuple2<String, String>("2", "f"));
-		inputs2.add(new Tuple2<String, String>("1", "g"));
-		inputs2.add(new Tuple2<String, String>("2", "h"));
-		inputs2.add(new Tuple2<String, String>("1", "i"));
-
-		List<String> expected = new ArrayList<String>();
-		expected.add("6");
-		expected.add("22");
-		expected.add("27");
-		expected.add("ace");
-		expected.add("bd");
-		expected.add("g");
-		expected.add("fh");
-		expected.add("i");
-
-		CoGroupedWindowReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>, String> invokable = new CoGroupedWindowReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>, String>(
-				new MyCoReduceFunction(), 4L, 3L, 4L, 3L, keySelector0, keySelector1,
-				new MyTimeStamp<Tuple2<String, Integer>>(timestamps1),
-				new MyTimeStamp<Tuple2<String, String>>(timestamps2));
-
-		List<String> result = MockCoContext.createAndExecute(invokable, inputs1, inputs2);
-
-		Collections.sort(result);
-		Collections.sort(expected);
-		assertEquals(expected, result);
-	}
-
-	@Test
-	public void coGroupedWindowReduceTest2() {
-
-		List<Long> timestamps1 = Arrays.asList(0L, 0L, 1L, 2L, 2L, 3L, 4L, 4L, 5L, 6L);
-		List<Tuple2<String, Integer>> inputs1 = new ArrayList<Tuple2<String, Integer>>();
-		inputs1.add(new Tuple2<String, Integer>("a", 1));
-		inputs1.add(new Tuple2<String, Integer>("a", 2));
-		inputs1.add(new Tuple2<String, Integer>("a", 3));
-		inputs1.add(new Tuple2<String, Integer>("a", 4));
-		inputs1.add(new Tuple2<String, Integer>("a", 5));
-		inputs1.add(new Tuple2<String, Integer>("b", 6));
-		inputs1.add(new Tuple2<String, Integer>("a", 7));
-		inputs1.add(new Tuple2<String, Integer>("b", 8));
-		inputs1.add(new Tuple2<String, Integer>("b", 9));
-		inputs1.add(new Tuple2<String, Integer>("b", 10));
-
-		List<Long> timestamps2 = Arrays.asList(1L, 1L, 2L, 2L, 3L, 3L, 4L, 4L, 5L);
-		List<Tuple2<String, String>> inputs2 = new ArrayList<Tuple2<String, String>>();
-		inputs2.add(new Tuple2<String, String>("1", "a"));
-		inputs2.add(new Tuple2<String, String>("2", "b"));
-		inputs2.add(new Tuple2<String, String>("1", "c"));
-		inputs2.add(new Tuple2<String, String>("2", "d"));
-		inputs2.add(new Tuple2<String, String>("1", "e"));
-		inputs2.add(new Tuple2<String, String>("2", "f"));
-		inputs2.add(new Tuple2<String, String>("1", "g"));
-		inputs2.add(new Tuple2<String, String>("2", "h"));
-		inputs2.add(new Tuple2<String, String>("1", "i"));
-
-		List<String> expected = new ArrayList<String>();
-		expected.add("15");
-		expected.add("6");
-		expected.add("16");
-		expected.add("23");
-		expected.add("7");
-		expected.add("27");
-		expected.add("ace");
-		expected.add("bdf");
-		expected.add("egi");
-		expected.add("fh");
-
-		CoGroupedWindowReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>, String> invokable = new CoGroupedWindowReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>, String>(
-				new MyCoReduceFunction(), 4L, 3L, 2L, 2L, keySelector0, keySelector1,
-				new MyTimeStamp<Tuple2<String, Integer>>(timestamps1),
-				new MyTimeStamp<Tuple2<String, String>>(timestamps2));
-
-		List<String> result = MockCoContext.createAndExecute(invokable, inputs1, inputs2);
-
-		Collections.sort(result);
-		Collections.sort(expected);
-		assertEquals(expected, result);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd28fcc/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java
deleted file mode 100644
index 035a021..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import org.apache.flink.streaming.api.invokable.operator.co.CoWindowReduceInvokable;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.apache.flink.streaming.util.MockCoContext;
-import org.junit.Test;
-
-public class CoWindowReduceTest {
-
-	private static class MyCoReduceFunction implements CoReduceFunction<Integer, String, String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Integer reduce1(Integer value1, Integer value2) {
-			return value1 + value2;
-		}
-
-		@Override
-		public String reduce2(String value1, String value2) {
-			return value1 + value2;
-		}
-
-		@Override
-		public String map1(Integer value) {
-			return value.toString();
-		}
-
-		@Override
-		public String map2(String value) {
-			return value;
-		}
-	}
-
-	public static final class MyTimeStamp<T> extends TimestampWrapper<T> {
-		private static final long serialVersionUID = 1L;
-
-		private Iterator<Long> timestamps;
-		private long start;
-
-		public MyTimeStamp(List<Long> timestamps) {
-			super(null, 0);
-			this.timestamps = timestamps.iterator();
-			this.start = timestamps.get(0);
-		}
-
-		@Override
-		public long getTimestamp(T value) {
-			long ts = timestamps.next();
-			return ts;
-		}
-
-		@Override
-		public long getStartTime() {
-			return start;
-		}
-	}
-
-	@Test
-	public void coWindowReduceTest1() {
-
-		List<Integer> inputs = new ArrayList<Integer>();
-		for (Integer i = 1; i <= 10; i++) {
-			inputs.add(i);
-		}
-
-		List<String> inputs2 = new ArrayList<String>();
-		inputs2.add("a");
-		inputs2.add("b");
-		inputs2.add("c");
-		inputs2.add("d");
-		inputs2.add("e");
-		inputs2.add("f");
-		inputs2.add("g");
-		inputs2.add("h");
-		inputs2.add("i");
-
-		List<Long> timestamps1 = Arrays.asList(0L, 2L, 3L, 5L, 7L, 9L, 10L, 11L, 11L, 13L);
-		List<Long> timestamps2 = Arrays.asList(0L, 1L, 1L, 2L, 2L, 3L, 3L, 4L, 4L);
-
-		CoWindowReduceInvokable<Integer, String, String> invokable = new CoWindowReduceInvokable<Integer, String, String>(
-				new MyCoReduceFunction(), 4L, 3L, 4L, 3L, new MyTimeStamp<Integer>(timestamps1),
-				new MyTimeStamp<String>(timestamps2));
-
-		List<String> expected = new ArrayList<String>();
-		expected.add("6");
-		expected.add("9");
-		expected.add("30");
-		expected.add("10");
-		expected.add("abcde");
-		expected.add("fghi");
-
-		List<String> result = MockCoContext.createAndExecute(invokable, inputs, inputs2);
-
-		Collections.sort(result);
-		Collections.sort(expected);
-		assertEquals(expected, result);
-
-	}
-
-	@Test
-	public void coWindowReduceTest2() {
-
-		List<Integer> inputs = new ArrayList<Integer>();
-		for (Integer i = 1; i <= 10; i++) {
-			inputs.add(i);
-		}
-
-		List<String> inputs2 = new ArrayList<String>();
-		inputs2.add("a");
-		inputs2.add("b");
-		inputs2.add("c");
-		inputs2.add("d");
-		inputs2.add("e");
-		inputs2.add("f");
-		inputs2.add("g");
-		inputs2.add("h");
-		inputs2.add("i");
-
-		List<Long> timestamps1 = Arrays.asList(0L, 1L, 1L, 1L, 2L, 2L, 3L, 8L, 10L, 11L);
-		List<Long> timestamps2 = Arrays.asList(1L, 2L, 4L, 5L, 6L, 9L, 10L, 11L, 13L);
-
-		CoWindowReduceInvokable<Integer, String, String> invokable = new CoWindowReduceInvokable<Integer, String, String>(
-				new MyCoReduceFunction(), 4L, 3L, 2L, 2L, new MyTimeStamp<Integer>(timestamps1),
-				new MyTimeStamp<String>(timestamps2));
-
-		List<String> expected = new ArrayList<String>();
-		expected.add("28");
-		expected.add("18");
-		expected.add("8");
-		expected.add("27");
-		expected.add("ab");
-		expected.add("cd");
-		expected.add("de");
-		expected.add("f");
-		expected.add("fgh");
-		expected.add("hi");
-
-		List<String> result = MockCoContext.createAndExecute(invokable, inputs, inputs2);
-
-		Collections.sort(result);
-		Collections.sort(expected);
-		assertEquals(expected, result);
-
-	}
-
-}


Mime
View raw message