flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [06/20] flink git commit: [streaming] StreamWindow abstraction added with typeinfo and tests
Date Mon, 16 Feb 2015 14:25:32 GMT
[streaming] StreamWindow abstraction added with typeinfo and tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/667f8199
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/667f8199
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/667f8199

Branch: refs/heads/master
Commit: 667f819913ec59a433c51907132f5eca4000e002
Parents: 26ae979
Author: Gyula Fora <gyfora@apache.org>
Authored: Wed Feb 4 17:14:51 2015 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Mon Feb 16 13:06:07 2015 +0100

----------------------------------------------------------------------
 .../operator/windowing/StreamWindow.java        | 158 ++++++++++++++++++
 .../windowing/StreamWindowSerializer.java       | 136 ++++++++++++++++
 .../windowing/StreamWindowTypeInfo.java         |  72 +++++++++
 .../operator/windowing/StreamWindowTest.java    | 161 +++++++++++++++++++
 4 files changed, 527 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/667f8199/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindow.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindow.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindow.java
new file mode 100644
index 0000000..b37bf47
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindow.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.windowing;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.util.Collector;
+
+import com.google.common.collect.Lists;
+
+public class StreamWindow<T> extends ArrayList<T> implements Collector<T>
{
+
+	private static final long serialVersionUID = -5150196421193988403L;
+	private static Random rnd = new Random();
+
+	public int windowID;
+	public int transformationID;
+
+	public int numberOfParts;
+
+	public StreamWindow() {
+		this(rnd.nextInt(), rnd.nextInt(), 1);
+	}
+
+	public StreamWindow(int windowID) {
+		this(windowID, rnd.nextInt(), 1);
+	}
+
+	public StreamWindow(int windowID, int transformationID, int numberOfParts) {
+		super();
+		this.windowID = windowID;
+		this.transformationID = transformationID;
+		this.numberOfParts = numberOfParts;
+	}
+
+	public StreamWindow(StreamWindow<T> window) {
+		this(window.windowID, window.transformationID, window.numberOfParts);
+		addAll(window);
+	}
+
+	public StreamWindow(StreamWindow<T> window, TypeSerializer<T> serializer) {
+		this(window.windowID, window.transformationID, window.numberOfParts);
+		for (T element : window) {
+			add(serializer.copy(element));
+		}
+	}
+
+	public List<StreamWindow<T>> partitionBy(KeySelector<T, ?> keySelector)
throws Exception {
+		Map<Object, StreamWindow<T>> partitions = new HashMap<Object, StreamWindow<T>>();
+
+		for (T value : this) {
+			Object key = keySelector.getKey(value);
+			StreamWindow<T> window = partitions.get(key);
+			if (window == null) {
+				window = new StreamWindow<T>(this.windowID, this.transformationID, 0);
+				partitions.put(key, window);
+			}
+			window.add(value);
+		}
+
+		List<StreamWindow<T>> output = new ArrayList<StreamWindow<T>>();
+		int numkeys = partitions.size();
+
+		for (StreamWindow<T> window : partitions.values()) {
+			output.add(window.setNumberOfParts(numkeys));
+		}
+
+		return output;
+	}
+
+	public List<StreamWindow<T>> split(int n) {
+		List<List<T>> subLists = Lists.partition(this, (int) Math.ceil((double) size()
/ n));
+		List<StreamWindow<T>> split = new ArrayList<StreamWindow<T>>(n);
+		for (List<T> partition : subLists) {
+			StreamWindow<T> subWindow = new StreamWindow<T>(windowID, transformationID,
+					subLists.size());
+			subWindow.addAll(partition);
+			split.add(subWindow);
+		}
+		return split;
+	}
+
+	public StreamWindow<T> setNumberOfParts(int n) {
+		this.numberOfParts = n;
+		return this;
+	}
+
+	public boolean compatibleWith(StreamWindow<T> otherWindow) {
+		return this.windowID == otherWindow.windowID && this.numberOfParts > 1;
+	}
+
+	public static <R> StreamWindow<R> merge(StreamWindow<R>... windows) {
+		StreamWindow<R> window = new StreamWindow<R>(windows[0]);
+		for (int i = 1; i < windows.length; i++) {
+			StreamWindow<R> next = windows[i];
+			if (window.compatibleWith(next)) {
+				window.addAll(next);
+				window.numberOfParts--;
+			} else {
+				throw new RuntimeException("Can only merge compatible windows");
+			}
+		}
+		return window;
+	}
+
+	public static <R> StreamWindow<R> merge(List<StreamWindow<R>> windows)
{
+		if (windows.isEmpty()) {
+			throw new RuntimeException("Need at least one window to merge");
+		} else {
+			StreamWindow<R> window = new StreamWindow<R>(windows.get(0));
+			for (int i = 1; i < windows.size(); i++) {
+				StreamWindow<R> next = windows.get(i);
+				if (window.compatibleWith(next)) {
+					window.addAll(next);
+					window.numberOfParts--;
+				} else {
+					throw new RuntimeException("Can only merge compatible windows");
+				}
+			}
+			return window;
+		}
+	}
+
+	@Override
+	public void collect(T record) {
+		add(record);
+	}
+
+	@Override
+	public void close() {
+	}
+
+	@Override
+	public String toString() {
+		return super.toString() + " " + windowID + " (" + numberOfParts + ")";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/667f8199/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindowSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindowSerializer.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindowSerializer.java
new file mode 100755
index 0000000..70c608c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindowSerializer.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUStreamRecord<?>WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.windowing;
+
+import java.io.IOException;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public final class StreamWindowSerializer<T> extends TypeSerializer<StreamWindow<T>>
{
+
+	private static final long serialVersionUID = 1L;
+
+	private final TypeSerializer<T> typeSerializer;
+	TypeSerializer<Integer> intSerializer = BasicTypeInfo.INT_TYPE_INFO.createSerializer();
+
+	public StreamWindowSerializer(TypeInformation<T> typeInfo) {
+		this.typeSerializer = typeInfo.createSerializer();
+	}
+
+	public TypeSerializer<T> getObjectSerializer() {
+		return typeSerializer;
+	}
+
+	@Override
+	public boolean isImmutableType() {
+		return false;
+	}
+
+	@Override
+	public boolean isStateful() {
+		return false;
+	}
+
+	@Override
+	public StreamWindow<T> createInstance() {
+		return new StreamWindow<T>(0, 0, 0);
+	}
+
+	@Override
+	public StreamWindow<T> copy(StreamWindow<T> from) {
+		return new StreamWindow<T>(from, typeSerializer);
+	}
+
+	@Override
+	public StreamWindow<T> copy(StreamWindow<T> from, StreamWindow<T> reuse)
{
+		reuse.clear();
+		reuse.windowID = from.windowID;
+		reuse.transformationID = from.transformationID;
+		reuse.numberOfParts = from.numberOfParts;
+
+		for (T element : from) {
+			reuse.add(typeSerializer.copy(element));
+		}
+		return reuse;
+	}
+
+	@Override
+	public int getLength() {
+		return -1;
+	}
+
+	@Override
+	public void serialize(StreamWindow<T> value, DataOutputView target) throws IOException
{
+
+		intSerializer.serialize(value.windowID, target);
+		intSerializer.serialize(value.transformationID, target);
+		intSerializer.serialize(value.numberOfParts, target);
+		intSerializer.serialize(value.size(), target);
+
+		for (T element : value) {
+			typeSerializer.serialize(element, target);
+		}
+	}
+
+	@Override
+	public StreamWindow<T> deserialize(DataInputView source) throws IOException {
+		StreamWindow<T> window = createInstance();
+
+		window.windowID = intSerializer.deserialize(source);
+		window.transformationID = intSerializer.deserialize(source);
+		window.numberOfParts = intSerializer.deserialize(source);
+
+		int size = intSerializer.deserialize(source);
+
+		for (int i = 0; i < size; i++) {
+			window.add(typeSerializer.deserialize(source));
+		}
+
+		return window;
+	}
+
+	@Override
+	public StreamWindow<T> deserialize(StreamWindow<T> reuse, DataInputView source)
+			throws IOException {
+
+		StreamWindow<T> window = reuse;
+		window.clear();
+
+		window.windowID = source.readInt();
+		window.transformationID = source.readInt();
+		window.numberOfParts = source.readInt();
+
+		int size = source.readInt();
+
+		for (int i = 0; i < size; i++) {
+			window.add(typeSerializer.deserialize(source));
+		}
+
+		return window;
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		// Needs to be implemented
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/667f8199/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindowTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindowTypeInfo.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindowTypeInfo.java
new file mode 100644
index 0000000..a9a4bba
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindowTypeInfo.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUStreamRecord<?>WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.windowing;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+public class StreamWindowTypeInfo<T> extends TypeInformation<StreamWindow<T>>
{
+
+	TypeInformation<T> innerType;
+
+	public StreamWindowTypeInfo(TypeInformation<T> innerType) {
+		this.innerType = innerType;
+	}
+
+	public TypeInformation<T> getInnerType() {
+		return innerType;
+	}
+
+	@Override
+	public boolean isBasicType() {
+		return innerType.isBasicType();
+	}
+
+	@Override
+	public boolean isTupleType() {
+		return innerType.isTupleType();
+	}
+
+	@Override
+	public int getArity() {
+		return innerType.getArity();
+	}
+
+	@Override
+	public Class<StreamWindow<T>> getTypeClass() {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	@Override
+	public boolean isKeyType() {
+		return innerType.isKeyType();
+	}
+
+	@Override
+	public TypeSerializer<StreamWindow<T>> createSerializer() {
+		return new StreamWindowSerializer<T>(innerType);
+	}
+
+	@Override
+	public int getTotalFields() {
+		return innerType.getTotalFields();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/667f8199/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindowTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindowTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindowTest.java
new file mode 100644
index 0000000..66a11f5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindowTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.windowing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.junit.Test;
+
+public class StreamWindowTest {
+
+	@Test
+	public void creationTest() {
+
+		StreamWindow<Integer> window1 = new StreamWindow<Integer>();
+		assertTrue(window1.isEmpty());
+		assertTrue(window1.windowID != 0);
+
+		window1.add(10);
+		assertEquals(1, window1.size());
+
+		StreamWindow<Integer> window2 = new StreamWindow<Integer>(window1);
+
+		assertTrue(window1.windowID == window2.windowID);
+		assertEquals(1, window2.size());
+
+		StreamWindow<Integer> window3 = new StreamWindow<Integer>(100);
+		assertEquals(100, window3.windowID);
+
+		StreamWindow<Integer> window4 = new StreamWindow<Integer>();
+		assertFalse(window4.windowID == window1.windowID);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void mergeTest() throws IOException {
+		StreamWindow<Integer> window1 = new StreamWindow<Integer>().setNumberOfParts(3);
+		StreamWindow<Integer> window2 = new StreamWindow<Integer>(window1.windowID,
+				window1.transformationID, 3);
+		StreamWindow<Integer> window3 = new StreamWindow<Integer>(window1.windowID,
+				window1.transformationID, 3);
+
+		window1.add(1);
+		window2.add(2);
+		window3.add(3);
+
+		Set<Integer> values = new HashSet<Integer>();
+		values.add(1);
+		values.add(2);
+		values.add(3);
+
+		StreamWindow<Integer> merged = StreamWindow.merge(window1, window2, window3);
+
+		assertEquals(3, merged.size());
+		assertEquals(window1.windowID, merged.windowID);
+		assertEquals(values, new HashSet<Integer>(merged));
+
+		try {
+			StreamWindow.merge(window1, new StreamWindow<Integer>());
+			fail();
+		} catch (RuntimeException e) {
+			// good
+		}
+
+		List<StreamWindow<Integer>> wList = merged.split(3);
+
+		StreamWindow<Integer> merged2 = StreamWindow.merge(wList);
+
+		assertEquals(3, merged2.size());
+		assertEquals(window1.windowID, merged2.windowID);
+		assertEquals(values, new HashSet<Integer>(merged2));
+
+		TypeSerializer<StreamWindow<Integer>> ts = new StreamWindowSerializer<Integer>(
+				BasicTypeInfo.INT_TYPE_INFO);
+
+		TestOutputView ow = new TestOutputView();
+
+		ts.serialize(merged2, ow);
+
+		TestInputView iw = ow.getInputView();
+
+		System.out.println(ts.deserialize(iw));
+
+	}
+
+	@Test
+	public void partitionTest() {
+
+	}
+
+	private class TestOutputView extends DataOutputStream implements DataOutputView {
+
+		public TestOutputView() {
+			super(new ByteArrayOutputStream(4096));
+		}
+
+		public TestInputView getInputView() {
+			ByteArrayOutputStream baos = (ByteArrayOutputStream) out;
+			return new TestInputView(baos.toByteArray());
+		}
+
+		@Override
+		public void skipBytesToWrite(int numBytes) throws IOException {
+			for (int i = 0; i < numBytes; i++) {
+				write(0);
+			}
+		}
+
+		@Override
+		public void write(DataInputView source, int numBytes) throws IOException {
+			byte[] buffer = new byte[numBytes];
+			source.readFully(buffer);
+			write(buffer);
+		}
+	}
+
+	private class TestInputView extends DataInputStream implements DataInputView {
+
+		public TestInputView(byte[] data) {
+			super(new ByteArrayInputStream(data));
+		}
+
+		@Override
+		public void skipBytesToRead(int numBytes) throws IOException {
+			while (numBytes > 0) {
+				int skipped = skipBytes(numBytes);
+				numBytes -= skipped;
+			}
+		}
+	}
+}


Mime
View raw message