flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [03/51] [abbrv] git commit: [streaming] windowReduce added + BatchReduceInvokable reworked
Date Mon, 18 Aug 2014 17:25:40 GMT
[streaming] windowReduce added + BatchReduceInvokable reworked


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/6e521955
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/6e521955
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/6e521955

Branch: refs/heads/master
Commit: 6e521955d8477946d780f36ea9a11180f08a86e1
Parents: 5dbf815
Author: gyfora <gyula.fora@gmail.com>
Authored: Tue Jul 22 17:38:50 2014 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Mon Aug 18 16:14:13 2014 +0200

----------------------------------------------------------------------
 .../apache/flink/streaming/api/DataStream.java  |  22 +++
 .../api/invokable/operator/BatchIterator.java   |   7 +
 .../operator/BatchReduceInvokable.java          | 133 +++++++++++++++----
 3 files changed, 134 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6e521955/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
index e67ac0e..4356795 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
@@ -388,6 +388,28 @@ public class DataStream<T extends Tuple> {
 	}
 
 	/**
+	 * Applies a reduce transformation on preset "time" chunks of the
+	 * DataStream. The transformation calls a {@link GroupReduceFunction} on
+	 * records received during the predefined time window. The window shifted
+	 * after each reduce call. Each GroupReduceFunction call can return any
+	 * number of elements including none.
+	 * 
+	 * 
+	 * @param reducer
+	 *            The GroupReduceFunction that is called for each time window.
+	 * @param windowSize
+	 *            The time window to run the reducer on, in milliseconds.
+	 * @param <R>
+	 *            output type
+	 * @return The modified DataStream.
+	 */
+	public <R extends Tuple> StreamOperator<T, R> windowReduce(GroupReduceFunction<T,
R> reducer,
+			long windowSize) {
+		return environment.addFunction("batchReduce", new DataStream<T>(this), reducer,
+				new BatchReduceInvokable<T, R>(reducer, windowSize));
+	}
+
+	/**
 	 * Adds the given sink to this environment. Only streams with sinks added
 	 * will be executed once the {@link StreamExecutionEnvironment#execute()}
 	 * method is called.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6e521955/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchIterator.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchIterator.java
new file mode 100755
index 0000000..1432749
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchIterator.java
@@ -0,0 +1,7 @@
+package org.apache.flink.streaming.api.invokable.operator;
+
+import java.util.Iterator;
+
+public interface BatchIterator<IN> extends Iterator<IN> {
+	public void reset();
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6e521955/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
index 811a929..2d2d890 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
@@ -20,73 +20,149 @@
 package org.apache.flink.streaming.api.invokable.operator;
 
 import java.io.IOException;
-import java.util.Iterator;
 
 import org.apache.flink.api.java.functions.GroupReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 
 public class BatchReduceInvokable<IN extends Tuple, OUT extends Tuple> extends
 		UserTaskInvokable<IN, OUT> {
 	private static final long serialVersionUID = 1L;
-	private int counter;
 	private int batchSize;
+	private long windowSize;
+	volatile boolean isRunning;
+	boolean window;
 
 	private GroupReduceFunction<IN, OUT> reducer;
 
 	public BatchReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, int batchSize)
{
 		this.reducer = reduceFunction;
-		this.counter = 0;
 		this.batchSize = batchSize;
 	}
 
+	public BatchReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long windowSize)
{
+		this.reducer = reduceFunction;
+		this.windowSize = windowSize;
+		this.window = true;
+	}
+
 	@Override
 	public void invoke() throws Exception {
-		MyIterator it = new MyIterator();
-		if (this.isMutable) {
-			do {
-				reducer.reduce(it, collector);
-				it.reset();
-			} while (reuse != null);
+		BatchIterator<IN> userIterator;
+		if (window) {
+			userIterator = new WindowIterator();
 		} else {
-			do {
-				reducer.reduce(it, collector);
-				it.reset();
-			} while (reuse != null);
+			userIterator = new CounterIterator();
 		}
 
+		do {
+			if (userIterator.hasNext()) {
+				reducer.reduce(userIterator, collector);
+				userIterator.reset();
+			}
+		} while (reuse != null);
+	}
+
+	private StreamRecord<IN> loadNextRecord() {
+		if (!isMutable) {
+			resetReuse();
+		}
+		try {
+			reuse = recordIterator.next(reuse);
+		} catch (IOException e) {
+			e.printStackTrace();
+		}
+		return reuse;
 	}
 
-	public class MyIterator implements Iterator<IN> {
+	public class CounterIterator implements BatchIterator<IN> {
+		private int counter;
+		private boolean loadedNext;
 
-		public MyIterator() {
-			reset();
+		public CounterIterator() {
+			counter = 1;
 		}
 
 		@Override
 		public boolean hasNext() {
-
-			if (counter >= batchSize) {
+			if (counter > batchSize) {
 				return false;
+			} else if (!loadedNext) {
+				loadNextRecord();
+				loadedNext = true;
+			}
+			return (reuse != null);
+		}
+
+		@Override
+		public IN next() {
+			if (hasNext()) {
+				counter++;
+				loadedNext = false;
+				return reuse.getTuple();
 			} else {
-				try {
-					resetReuse();
-					reuse = recordIterator.next(reuse);
-				} catch (IOException e) {
-					e.printStackTrace();
-				}
-				return (reuse != null);
+				counter++;
+				loadedNext = false;
+				return null;
+			}
+		}
+
+		public void reset() {
+			for (int i = 0; i < (batchSize - counter); i++) {
+				loadNextRecord();
 			}
+			loadNextRecord();
+			loadedNext = true;
+			counter = 1;
+		}
+
+		@Override
+		public void remove() {
+
+		}
+
+	}
+
+	public class WindowIterator implements BatchIterator<IN> {
+
+		volatile boolean iterate;
+		private boolean loadedNext;
+		private long startTime;
+
+		public WindowIterator() {
+			startTime = System.currentTimeMillis();
+		}
+
+		@Override
+		public boolean hasNext() {
+			if (System.currentTimeMillis() - startTime > windowSize) {
+				return false;
+			} else if (!loadedNext) {
+				loadNextRecord();
+				loadedNext = true;
+			}
+			return (reuse != null);
 		}
 
 		@Override
 		public IN next() {
-			counter++;
-			return reuse.getTuple();
+			if (hasNext()) {
+				loadedNext = false;
+				return reuse.getTuple();
+			} else {
+				loadedNext = false;
+				return reuse.getTuple();
+			}
 		}
 
 		public void reset() {
-			counter = 0;
+			while (System.currentTimeMillis() - startTime < windowSize) {
+				loadNextRecord();
+			}
+			loadNextRecord();
+			loadedNext = true;
+			startTime = System.currentTimeMillis();
 		}
 
 		@Override
@@ -95,4 +171,5 @@ public class BatchReduceInvokable<IN extends Tuple, OUT extends Tuple>
extends
 		}
 
 	}
+
 }
\ No newline at end of file


Mime
View raw message