flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [1/4] git commit: [streaming] Several bugfixes and doc updates
Date Tue, 07 Oct 2014 23:46:52 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master 7af127eac -> ec82d973d


[streaming] Several bugfixes and doc updates


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/49812f3c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/49812f3c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/49812f3c

Branch: refs/heads/master
Commit: 49812f3c8f65ff80c239f1af090738002094f35d
Parents: 7af127e
Author: szape <nemderogatorius@gmail.com>
Authored: Fri Oct 3 11:10:28 2014 +0200
Committer: mbalassi <balassi.marton@gmail.com>
Committed: Wed Oct 8 00:31:32 2014 +0200

----------------------------------------------------------------------
 docs/streaming_guide.md                         | 16 +++---
 .../operator/BatchReduceInvokable.java          | 55 +++++++++++++-------
 .../operator/GroupedBatchReduceInvokable.java   |  8 +--
 .../GroupedWindowGroupReduceInvokable.java      |  3 ++
 .../operator/GroupedWindowReduceInvokable.java  | 25 ---------
 .../operator/WindowReduceInvokable.java         |  6 +--
 .../operator/co/CoBatchReduceInvokable.java     | 50 +++++++++++-------
 .../co/CoGroupedBatchReduceInvokable.java       |  2 +-
 .../co/CoGroupedWindowReduceInvokable.java      | 15 ++++--
 .../api/invokable/operator/co/CoInvokable.java  | 16 +++---
 .../operator/co/CoWindowReduceInvokable.java    | 18 +++----
 .../invokable/operator/CoBatchReduceTest.java   |  4 +-
 .../operator/CoGroupedBatchReduceTest.java      |  5 +-
 .../operator/CoGroupedWindowReduceTest.java     |  2 +-
 .../invokable/operator/CoWindowReduceTest.java  |  2 +-
 .../operator/GroupedBatchReduceTest.java        |  2 -
 16 files changed, 116 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/49812f3c/docs/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md
index 6a8de20..095c146 100644
--- a/docs/streaming_guide.md
+++ b/docs/streaming_guide.md
@@ -416,8 +416,8 @@ To use this function the user needs to call, the `iteration.setMaxWaitTime(milli
 The usage of rich functions are essentially the same as in the core Flink API. All transformations
that take as argument a user-defined function can instead take a rich function as argument:
 
 ~~~java
-dataStream.map(new RichMapFunction<String, Integer>() {
-  public Integer map(String value) { return value.toString(); }
+dataStream.map(new RichMapFunction<Integer, String>() {
+  public String map(Integer value) { return value.toString(); }
 });
 ~~~
 
@@ -549,7 +549,7 @@ The API provided is the [same](#kafka_source_close) as the one for `KafkaSource`
 #### Building A Topology
 To use a Kafka connector as a source in Flink call the `addSource()` function with a new
instance of the class which extends `KafkaSource` as parameter:
 
-```java
+~~~java
 DataStream<String> stream1 = env.
     addSource(new MyKafkaSource("localhost:2181", "group", "test", 1), SOURCE_PARALELISM)
     .print();
@@ -564,7 +564,7 @@ The followings have to be provided for the `MyKafkaSource()` constructor
in orde
 
 Similarly to use a Kafka connector as a sink in Flink call the `addSink()` function with
a new instance of the class which extends `KafkaSink`:
 
-```java
+~~~java
 DataStream<String> stream2 = env
     .addSource(new MySource())
     .addSink(new MyKafkaSink("test", "localhost:9092"));
@@ -668,7 +668,7 @@ The API provided is the [same](#flume_source_close) as the one for `FlumeSource`
 #### Building A Topology
 To use a Flume connector as a source in Flink call the `addSource()` function with a new
instance of the class which extends `FlumeSource` as parameter:
 
-```java
+~~~java
 DataStream<String> dataStream1 = env
     .addSource(new MyFlumeSource("localhost", 41414))
     .print();
@@ -681,7 +681,7 @@ The followings have to be provided for the `MyFlumeSource()` constructor
in orde
 
 Similarly to use a Flume connector as a sink in Flink call the `addSink()` function with
a new instance of the class which extends `FlumeSink`
 
-```java
+~~~java
 DataStream<String> dataStream2 = env
     .fromElements("one", "two", "three", "four", "five", "q")
     .addSink(new MyFlumeSink("localhost", 42424));
@@ -824,7 +824,7 @@ The followings have to be provided for the `MyRabbitMQSource()` constructor
in o
 
 Similarly to use a RabbitMQ connector as a sink in Flink call the `addSink()` function with
a new instance of the class which extends `RabbitMQSink`
 
-```java
+~~~java
 DataStream<String> dataStream2 = env
     .fromElements("one", "two", "three", "four", "five", "q")
     .addSink(new MyRMQSink("localhost", "hello"));
@@ -847,7 +847,7 @@ Twitter Streaming API provides opportunity to connect to the stream of
tweets ma
 In order to connect to Twitter stream the user has to register their program and acquire
the necessary information for the authentication. The process is described below.
 
 #### Acquiring the authentication information
-First of all, a Twitter account is needed. Sign up for free at [twitter.com/signup](https://twitter.com/signup)
or sing in at Twitter's [Application Management](https://apps.twitter.com/) and register the
application by clicking on the "Create New App" button. Fill out a form about your program
and accept the Terms and Conditions. 
+First of all, a Twitter account is needed. Sign up for free at [twitter.com/signup](https://twitter.com/signup)
or sign in at Twitter's [Application Management](https://apps.twitter.com/) and register the
application by clicking on the "Create New App" button. Fill out a form about your program
and accept the Terms and Conditions. 
 After selecting the application you the API key and API secret (called `consumerKey` and
`sonsumerSecret` in `TwitterSource` respectively) is located on the "API Keys" tab. The necessary
access token data (`token` and `secret`) can be acquired here. 
 Remember to keep these pieces of information a secret and do not push them to public repositories.
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/49812f3c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
index 6672fca..1e6dfa4 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
@@ -22,6 +22,8 @@ import java.util.Iterator;
 
 import org.apache.commons.math.util.MathUtils;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.state.NullableCircularBuffer;
@@ -40,6 +42,8 @@ public class BatchReduceInvokable<OUT> extends StreamInvokable<OUT,
OUT> {
 	protected StreamBatch batch;
 	protected StreamBatch currentBatch;
 
+	protected TypeSerializer<OUT> serializer;
+
 	public BatchReduceInvokable(ReduceFunction<OUT> reduceFunction, long batchSize, long
slideSize) {
 		super(reduceFunction);
 		this.reducer = reduceFunction;
@@ -48,7 +52,6 @@ public class BatchReduceInvokable<OUT> extends StreamInvokable<OUT,
OUT> {
 		this.granularity = (int) MathUtils.gcd(batchSize, slideSize);
 		this.batchPerSlide = slideSize / granularity;
 		this.numberOfBatches = batchSize / granularity;
-		this.batch = new StreamBatch();
 	}
 
 	@Override
@@ -57,7 +60,7 @@ public class BatchReduceInvokable<OUT> extends StreamInvokable<OUT,
OUT> {
 			throw new RuntimeException("DataStream must not be empty");
 		}
 
-		while (reuse != null) {		
+		while (reuse != null) {
 			StreamBatch batch = getBatch(reuse);
 
 			batch.reduceToBuffer(reuse.getObject());
@@ -65,7 +68,7 @@ public class BatchReduceInvokable<OUT> extends StreamInvokable<OUT,
OUT> {
 			resetReuse();
 			reuse = recordIterator.next(reuse);
 		}
-		
+
 		reduceLastBatch();
 
 	}
@@ -86,7 +89,7 @@ public class BatchReduceInvokable<OUT> extends StreamInvokable<OUT,
OUT> {
 	}
 
 	protected void reduceLastBatch() throws Exception {
-		batch.reduceLastBatch();		
+		batch.reduceLastBatch();
 	}
 
 	@Override
@@ -101,7 +104,7 @@ public class BatchReduceInvokable<OUT> extends StreamInvokable<OUT,
OUT> {
 		while (reducedIterator.hasNext()) {
 			OUT next = reducedIterator.next();
 			if (next != null) {
-				reduced = reducer.reduce(reduced, next);
+				reduced = reducer.reduce(serializer.copy(reduced), serializer.copy(next));
 			}
 		}
 		if (reduced != null) {
@@ -115,6 +118,7 @@ public class BatchReduceInvokable<OUT> extends StreamInvokable<OUT,
OUT> {
 		protected long counter;
 		protected long minibatchCounter;
 		protected OUT currentValue;
+		boolean changed;
 
 		protected NullableCircularBuffer circularBuffer;
 
@@ -123,13 +127,13 @@ public class BatchReduceInvokable<OUT> extends StreamInvokable<OUT,
OUT> {
 			this.circularBuffer = new NullableCircularBuffer((int) (batchSize / granularity));
 			this.counter = 0;
 			this.minibatchCounter = 0;
-
+			this.changed = false;
 		}
 
 		public void reduceToBuffer(OUT nextValue) throws Exception {
 
 			if (currentValue != null) {
-				currentValue = reducer.reduce(currentValue, nextValue);
+				currentValue = reducer.reduce(serializer.copy(currentValue), serializer.copy(nextValue));
 			} else {
 				currentValue = nextValue;
 			}
@@ -147,20 +151,20 @@ public class BatchReduceInvokable<OUT> extends StreamInvokable<OUT,
OUT> {
 
 		protected void addToBuffer() {
 			circularBuffer.add(currentValue);
+			changed = true;
 			minibatchCounter++;
 			currentValue = null;
 		}
 
 		protected boolean miniBatchEnd() {
-			if( (counter % granularity) == 0){
+			if ((counter % granularity) == 0) {
 				counter = 0;
 				return true;
-			}else{
+			} else {
 				return false;
 			}
 		}
-		
-		
+
 		public boolean batchEnd() {
 			if (minibatchCounter == numberOfBatches) {
 				minibatchCounter -= batchPerSlide;
@@ -170,39 +174,50 @@ public class BatchReduceInvokable<OUT> extends StreamInvokable<OUT,
OUT> {
 		}
 
 		public void reduceLastBatch() throws Exception {
+
 			if (miniBatchInProgress()) {
 				addToBuffer();
 			}
-
-			if (minibatchCounter >= 0) {
-				for (long i = 0; i < (numberOfBatches - minibatchCounter); i++) {
-					circularBuffer.remove();
+			if (changed == true && minibatchCounter >= 0) {
+				if (circularBuffer.isFull()) {
+					for (long i = 0; i < (numberOfBatches - minibatchCounter); i++) {
+						if (!circularBuffer.isEmpty()) {
+							circularBuffer.remove();
+						}
+					}
 				}
 				if (!circularBuffer.isEmpty()) {
 					reduce(this);
 				}
 			}
-
 		}
-		
-		public boolean miniBatchInProgress(){
+
+		public boolean miniBatchInProgress() {
 			return currentValue != null;
 		}
 
 		public void reduceBatch() {
 			reduce(this);
+			changed = false;
 		}
 
 		@SuppressWarnings("unchecked")
 		public Iterator<OUT> getIterator() {
 			return circularBuffer.iterator();
 		}
-		
+
 		@Override
-		public String toString(){
+		public String toString() {
 			return circularBuffer.toString();
 		}
 
 	}
 
+
+	@Override
+	public void open(Configuration config) throws Exception{
+		serializer = inSerializer.getObjectSerializer();
+		this.batch = new StreamBatch();
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/49812f3c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceInvokable.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceInvokable.java
index 962db3d..8d67d1b 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceInvokable.java
@@ -40,8 +40,8 @@ public class GroupedBatchReduceInvokable<OUT> extends BatchReduceInvokable<OUT>
 	protected StreamBatch getBatch(StreamRecord<OUT> next) {
 		Object key = next.getField(keyPosition);
 		StreamBatch batch = streamBatches.get(key);
-		if(batch == null){
-			batch=new StreamBatch();
+		if (batch == null) {
+			batch = new StreamBatch();
 			streamBatches.put(key, batch);
 		}
 		return batch;
@@ -49,9 +49,9 @@ public class GroupedBatchReduceInvokable<OUT> extends BatchReduceInvokable<OUT>
 
 	@Override
 	protected void reduceLastBatch() throws Exception {
-		for(StreamBatch batch: streamBatches.values()){
+		for (StreamBatch batch : streamBatches.values()) {
 			batch.reduceLastBatch();
-		}		
+		}
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/49812f3c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokable.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokable.java
index 7f8a8bf..4937eb8 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokable.java
@@ -48,6 +48,9 @@ public class GroupedWindowGroupReduceInvokable<IN, OUT> extends WindowGroupReduc
 		StreamWindow window = streamWindows.get(key);
 		if (window == null) {
 			window = new GroupedStreamWindow();
+			for (int i = 0; i < currentMiniBatchCount; i++) {
+				window.circularList.newSlide();
+			}
 			streamWindows.put(key, window);
 		}
 		this.window = window;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/49812f3c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowReduceInvokable.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowReduceInvokable.java
index 4e8eb3b..df5c979 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowReduceInvokable.java
@@ -18,7 +18,6 @@
 package org.apache.flink.streaming.api.invokable.operator;
 
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
@@ -71,29 +70,6 @@ public class GroupedWindowReduceInvokable<OUT> extends WindowReduceInvokable<OUT
 			window.reduceLastBatch();
 		}
 	}
-	
-	@Override
-	protected void callUserFunction() throws Exception {
-		Iterator<OUT> reducedIterator = currentBatch.getIterator();
-		OUT reduced = null;
-
-		while (reducedIterator.hasNext() && reduced == null) {
-			reduced = reducedIterator.next();
-		}
-
-		while (reducedIterator.hasNext()) {
-			OUT next = reducedIterator.next();
-			if (next != null) {
-				reduced = reducer.reduce(reduced, next);
-			}
-		}
-		if (reduced != null) {
-			collector.collect(reduced);
-		}else{
-			//remove window if no value received
-			streamWindows.remove(currentBatch);
-		}
-	}
 
 	protected class GroupedStreamWindow extends StreamWindow {
 
@@ -123,7 +99,6 @@ public class GroupedWindowReduceInvokable<OUT> extends WindowReduceInvokable<OUT
 			}
 			return false;
 		}
-		
 
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/49812f3c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
index da7a0ff..7a7253a 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
@@ -34,13 +34,13 @@ public class WindowReduceInvokable<OUT> extends BatchReduceInvokable<OUT>
{
 		super(reduceFunction, windowSize, slideInterval);
 		this.timestamp = timestamp;
 		this.startTime = timestamp.getStartTime();
-		this.window = new StreamWindow();
-		this.batch = this.window;
 	}
 
 	@Override
 	public void open(Configuration config) throws Exception {
 		super.open(config);
+		this.window = new StreamWindow();
+		this.batch = this.window;
 		if (timestamp instanceof DefaultTimeStamp) {
 			(new TimeCheck()).start();
 		}
@@ -61,7 +61,7 @@ public class WindowReduceInvokable<OUT> extends BatchReduceInvokable<OUT>
{
 			checkWindowEnd(timestamp.getTimestamp(nextValue));
 
 			if (currentValue != null) {
-				currentValue = reducer.reduce(currentValue, nextValue);
+				currentValue = reducer.reduce(serializer.copy(currentValue), serializer.copy(nextValue));
 			} else {
 				currentValue = nextValue;
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/49812f3c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchReduceInvokable.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchReduceInvokable.java
index 4a2f944..225d7ea 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchReduceInvokable.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -31,8 +31,6 @@ public class CoBatchReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1,
IN2,
 
 	private static final long serialVersionUID = 1L;
 	protected CoReduceFunction<IN1, IN2, OUT> coReducer;
-	protected TypeSerializer<IN1> typeSerializer1;
-	protected TypeSerializer<IN2> typeSerializer2;
 
 	protected long slideSize1;
 	protected long slideSize2;
@@ -48,6 +46,8 @@ public class CoBatchReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1,
IN2,
 	protected StreamBatch<IN2> batch2;
 	protected StreamBatch<IN1> currentBatch1;
 	protected StreamBatch<IN2> currentBatch2;
+	protected TypeSerializer<IN1> serializer1;
+	protected TypeSerializer<IN2> serializer2;
 
 	public CoBatchReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer, long batchSize1,
 			long batchSize2, long slideSize1, long slideSize2) {
@@ -63,8 +63,6 @@ public class CoBatchReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1,
IN2,
 		this.batchPerSlide2 = slideSize2 / granularity2;
 		this.numberOfBatches1 = batchSize1 / granularity1;
 		this.numberOfBatches2 = batchSize2 / granularity2;
-		this.batch1 = new StreamBatch<IN1>(batchSize1, slideSize1);
-		this.batch2 = new StreamBatch<IN2>(batchSize2, slideSize2);
 	}
 
 	@Override
@@ -142,11 +140,11 @@ public class CoBatchReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1,
IN2,
 		while (reducedIterator.hasNext()) {
 			IN1 next = reducedIterator.next();
 			if (next != null) {
-				reduced = coReducer.reduce1(reduced, next);
+				reduced = coReducer.reduce1(serializer1.copy(reduced), serializer1.copy(next));
 			}
 		}
 		if (reduced != null) {
-			collector.collect(coReducer.map1(reduced));
+			collector.collect(coReducer.map1(serializer1.copy(reduced)));
 		}
 	}
 
@@ -162,26 +160,29 @@ public class CoBatchReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1,
IN2,
 		while (reducedIterator.hasNext()) {
 			IN2 next = reducedIterator.next();
 			if (next != null) {
-				reduced = coReducer.reduce2(reduced, next);
+				reduced = coReducer.reduce2(serializer2.copy(reduced), serializer2.copy(next));
 			}
 		}
 		if (reduced != null) {
-			collector.collect(coReducer.map2(reduced));
+			collector.collect(coReducer.map2(serializer2.copy(reduced)));
 		}
 	}
 
 	@Override
 	public void open(Configuration config) throws Exception {
 		super.open(config);
-		this.typeSerializer1 = serializer1.getObjectSerializer();
-		this.typeSerializer2 = serializer2.getObjectSerializer();
+		this.batch1 = new StreamBatch<IN1>(batchSize1, slideSize1);
+		this.batch2 = new StreamBatch<IN2>(batchSize2, slideSize2);
+		this.serializer1 = srSerializer1.getObjectSerializer();
+		this.serializer2 = srSerializer2.getObjectSerializer();
 	}
 
 	public void reduceToBuffer1(StreamRecord<IN1> next, StreamBatch<IN1> streamBatch)
 			throws Exception {
 		IN1 nextValue = next.getObject();
 		if (streamBatch.currentValue != null) {
-			streamBatch.currentValue = coReducer.reduce1(streamBatch.currentValue, nextValue);
+			streamBatch.currentValue = coReducer.reduce1(
+					serializer1.copy(streamBatch.currentValue), serializer1.copy(nextValue));
 		} else {
 			streamBatch.currentValue = nextValue;
 		}
@@ -200,7 +201,8 @@ public class CoBatchReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1,
IN2,
 			throws Exception {
 		IN2 nextValue = next.getObject();
 		if (streamBatch.currentValue != null) {
-			streamBatch.currentValue = coReducer.reduce2(streamBatch.currentValue, nextValue);
+			streamBatch.currentValue = coReducer.reduce2(
+					serializer2.copy(streamBatch.currentValue), serializer2.copy(nextValue));
 		} else {
 			streamBatch.currentValue = nextValue;
 		}
@@ -220,9 +222,13 @@ public class CoBatchReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1,
IN2,
 			streamBatch.addToBuffer();
 		}
 
-		if (streamBatch.minibatchCounter >= 0) {
-			for (long i = 0; i < (numberOfBatches1 - streamBatch.minibatchCounter); i++) {
-				streamBatch.circularBuffer.remove();
+		if (streamBatch.changed == true && streamBatch.minibatchCounter >= 0) {
+			if (streamBatch.circularBuffer.isFull()) {
+				for (long i = 0; i < (numberOfBatches1 - streamBatch.minibatchCounter); i++) {
+					if (!streamBatch.circularBuffer.isEmpty()) {
+						streamBatch.circularBuffer.remove();
+					}
+				}
 			}
 			if (!streamBatch.circularBuffer.isEmpty()) {
 				reduce1(streamBatch);
@@ -236,9 +242,11 @@ public class CoBatchReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1,
IN2,
 			streamBatch.addToBuffer();
 		}
 
-		if (streamBatch.minibatchCounter >= 0) {
+		if (streamBatch.changed == true && streamBatch.minibatchCounter >= 0) {
 			for (long i = 0; i < (numberOfBatches2 - streamBatch.minibatchCounter); i++) {
-				streamBatch.circularBuffer.remove();
+				if (!streamBatch.circularBuffer.isEmpty()) {
+					streamBatch.circularBuffer.remove();
+				}
 			}
 			if (!streamBatch.circularBuffer.isEmpty()) {
 				reduce2(streamBatch);
@@ -249,10 +257,12 @@ public class CoBatchReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1,
IN2,
 
 	public void reduceBatch1(StreamBatch<IN1> streamBatch) {
 		reduce1(streamBatch);
+		streamBatch.changed = false;
 	}
 
 	public void reduceBatch2(StreamBatch<IN2> streamBatch) {
 		reduce2(streamBatch);
+		streamBatch.changed = false;
 	}
 
 	protected class StreamBatch<IN> implements Serializable {
@@ -266,6 +276,7 @@ public class CoBatchReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1,
IN2,
 		protected long granularity;
 		protected long batchPerSlide;
 		protected long numberOfBatches;
+		boolean changed;
 
 		protected NullableCircularBuffer circularBuffer;
 
@@ -279,10 +290,13 @@ public class CoBatchReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1,
IN2,
 			this.minibatchCounter = 0;
 			this.currentValue = null;
 			this.numberOfBatches = batchSize / granularity;
+			this.changed = false;
+
 		}
 
 		protected void addToBuffer() {
 			circularBuffer.add(currentValue);
+			changed = true;
 			minibatchCounter++;
 			currentValue = null;
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/49812f3c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedBatchReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedBatchReduceInvokable.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedBatchReduceInvokable.java
index edfe04d..4d51547 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedBatchReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedBatchReduceInvokable.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/49812f3c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowReduceInvokable.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowReduceInvokable.java
index d6942ad..c517c8d 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowReduceInvokable.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -22,6 +22,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.function.co.CoReduceFunction;
 import org.apache.flink.streaming.api.invokable.util.TimeStamp;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
@@ -38,10 +39,6 @@ public class CoGroupedWindowReduceInvokable<IN1, IN2, OUT> extends
 		super(coReducer, windowSize1, windowSize2, slideInterval1, slideInterval2, timestamp1,
 				timestamp2);
 		this.keyPosition1 = keyPosition1;
-		this.batch1 = new GroupedStreamWindow<IN1>(windowSize1, slideInterval1);
-		this.batch2 = new GroupedStreamWindow<IN2>(windowSize2, slideInterval2);
-		// this.batch1 = this.window1;
-		// this.batch2 = this.window2;
 	}
 
 	@Override
@@ -128,6 +125,13 @@ public class CoGroupedWindowReduceInvokable<IN1, IN2, OUT> extends
 
 	}
 
+	@Override
+	public void open(Configuration config) throws Exception {
+		super.open(config);
+		this.batch1 = new GroupedStreamWindow<IN1>(batchSize1, slideSize1);
+		this.batch2 = new GroupedStreamWindow<IN2>(batchSize2, slideSize2);
+	}
+
 	protected class GroupedStreamWindow<IN> extends StreamWindow<IN> {
 
 		private static final long serialVersionUID = 1L;
@@ -156,6 +160,7 @@ public class CoGroupedWindowReduceInvokable<IN1, IN2, OUT> extends
 			}
 
 			circularBuffer.add(currentValues);
+			changed = true;
 			minibatchCounter++;
 			currentValues = reuseMap;
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/49812f3c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
index 098bbc6..a37be1c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
@@ -39,8 +39,8 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<IN1,
OU
 	protected CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> recordIterator;
 	protected StreamRecord<IN1> reuse1;
 	protected StreamRecord<IN2> reuse2;
-	protected StreamRecordSerializer<IN1> serializer1;
-	protected StreamRecordSerializer<IN2> serializer2;
+	protected StreamRecordSerializer<IN1> srSerializer1;
+	protected StreamRecordSerializer<IN2> srSerializer2;
 
 	public void initialize(Collector<OUT> collector,
 			CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> recordIterator,
@@ -52,22 +52,22 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<IN1,
OU
 		this.reuse1 = serializer1.createInstance();
 		this.reuse2 = serializer2.createInstance();
 
-		this.serializer1 = serializer1;
-		this.serializer2 = serializer2;
+		this.srSerializer1 = serializer1;
+		this.srSerializer2 = serializer2;
 		this.isMutable = isMutable;
 	}
 
 	protected void resetReuseAll() {
-		this.reuse1 = serializer1.createInstance();
-		this.reuse2 = serializer2.createInstance();
+		this.reuse1 = srSerializer1.createInstance();
+		this.reuse2 = srSerializer2.createInstance();
 	}
 
 	protected void resetReuse1() {
-		this.reuse1 = serializer1.createInstance();
+		this.reuse1 = srSerializer1.createInstance();
 	}
 
 	protected void resetReuse2() {
-		this.reuse2 = serializer2.createInstance();
+		this.reuse2 = srSerializer2.createInstance();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/49812f3c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowReduceInvokable.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowReduceInvokable.java
index 505ad0a..d005901 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowReduceInvokable.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -32,9 +32,6 @@ public class CoWindowReduceInvokable<IN1, IN2, OUT> extends CoBatchReduceInvokab
 	protected TimeStamp<IN1> timestamp1;
 	protected TimeStamp<IN2> timestamp2;
 
-	// protected StreamWindow<IN1> window1;
-	// protected StreamWindow<IN2> window2;
-
 	public CoWindowReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer, long windowSize1,
 			long windowSize2, long slideInterval1, long slideInterval2, TimeStamp<IN1> timestamp1,
 			TimeStamp<IN2> timestamp2) {
@@ -43,10 +40,7 @@ public class CoWindowReduceInvokable<IN1, IN2, OUT> extends CoBatchReduceInvokab
 		this.timestamp2 = timestamp2;
 		this.startTime1 = timestamp1.getStartTime();
 		this.startTime2 = timestamp2.getStartTime();
-		this.batch1 = new StreamWindow<IN1>(windowSize1, slideInterval1);
-		this.batch2 = new StreamWindow<IN2>(windowSize2, slideInterval2);
-		// this.batch1 = this.window1;
-		// this.batch2 = this.window2;
+
 	}
 
 	@Override
@@ -57,7 +51,8 @@ public class CoWindowReduceInvokable<IN1, IN2, OUT> extends CoBatchReduceInvokab
 		checkBatchEnd1(timestamp1.getTimestamp(nextValue), streamWindow);
 
 		if (streamWindow.currentValue != null) {
-			streamWindow.currentValue = coReducer.reduce1(streamWindow.currentValue, nextValue);
+			streamWindow.currentValue = coReducer.reduce1(
+					serializer1.copy(streamWindow.currentValue), serializer1.copy(nextValue));
 		} else {
 			streamWindow.currentValue = nextValue;
 		}
@@ -71,7 +66,8 @@ public class CoWindowReduceInvokable<IN1, IN2, OUT> extends CoBatchReduceInvokab
 		checkBatchEnd2(timestamp2.getTimestamp(nextValue), streamWindow);
 
 		if (streamWindow.currentValue != null) {
-			streamWindow.currentValue = coReducer.reduce2(streamWindow.currentValue, nextValue);
+			streamWindow.currentValue = coReducer.reduce2(
+					serializer2.copy(streamWindow.currentValue), serializer2.copy(nextValue));
 		} else {
 			streamWindow.currentValue = nextValue;
 		}
@@ -138,6 +134,8 @@ public class CoWindowReduceInvokable<IN1, IN2, OUT> extends CoBatchReduceInvokab
 	@Override
 	public void open(Configuration config) throws Exception {
 		super.open(config);
+		this.batch1 = new StreamWindow<IN1>(batchSize1, slideSize1);
+		this.batch2 = new StreamWindow<IN2>(batchSize2, slideSize2);
 		if (timestamp1 instanceof DefaultTimeStamp) {
 			(new TimeCheck1()).start();
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/49812f3c/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java
index fd7439f..44b0513 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -120,12 +120,10 @@ public class CoBatchReduceTest {
 		expected.add("18");
 		expected.add("26");
 		expected.add("34");
-		expected.add("19");
 		expected.add("abc");
 		expected.add("cde");
 		expected.add("efg");
 		expected.add("ghi");
-		expected.add("i");
 
 		List<String> result = MockCoInvokable.createAndExecute(invokable, inputs, inputs2);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/49812f3c/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java
index 0689fca..0a88763 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -132,12 +132,9 @@ public class CoGroupedBatchReduceTest {
 		List<String> expected = new ArrayList<String>();
 		expected.add("10");
 		expected.add("19");
-		expected.add("12");
 		expected.add("33");
-		expected.add("19");
 		expected.add("ace");
 		expected.add("egi");
-		expected.add("i");
 		expected.add("bdf");
 		expected.add("fh");
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/49812f3c/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
index b2fc3e5..05cb4d3 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/49812f3c/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java
index 494182b..4604b27 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/49812f3c/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceTest.java
index 16c8d92..d78bf61 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceTest.java
@@ -50,8 +50,6 @@ public class GroupedBatchReduceTest {
 		expected.add(3);
 		expected.add(3);
 		expected.add(15);
-		expected.add(1);
-		expected.add(5);
 
 
 		GroupedBatchReduceInvokable<Integer> invokable = new GroupedBatchReduceInvokable<Integer>(


Mime
View raw message