flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [38/52] [partial] flink git commit: [FLINK-1452] Rename 'flink-addons' to 'flink-staging'
Date Mon, 02 Feb 2015 18:42:16 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/AggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/AggregationFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/AggregationFunction.java
deleted file mode 100644
index 1c273d3..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/AggregationFunction.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.aggregation;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-
-public abstract class AggregationFunction<T> implements ReduceFunction<T> {
-	private static final long serialVersionUID = 1L;
-
-	public int position;
-
-	public AggregationFunction(int pos) {
-		this.position = pos;
-	}
-
-	public static enum AggregationType {
-		SUM, MIN, MAX, MINBY, MAXBY,
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java
deleted file mode 100644
index 226c45a..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.aggregation;
-
-import java.lang.reflect.Array;
-import java.lang.reflect.Field;
-import java.util.List;
-
-import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.PojoTypeInfo;
-import org.apache.flink.api.java.typeutils.runtime.PojoComparator;
-
-public abstract class ComparableAggregator<T> extends AggregationFunction<T> {
-
-	private static final long serialVersionUID = 1L;
-
-	public Comparator comparator;
-	public boolean byAggregate;
-	public boolean first;
-
-	public ComparableAggregator(int pos, AggregationType aggregationType, boolean first) {
-		super(pos);
-		this.comparator = Comparator.getForAggregation(aggregationType);
-		this.byAggregate = (aggregationType == AggregationType.MAXBY)
-				|| (aggregationType == AggregationType.MINBY);
-		this.first = first;
-	}
-
-	public static <R> AggregationFunction<R> getAggregator(int positionToAggregate,
-			TypeInformation<R> typeInfo, AggregationType aggregationType) {
-		return getAggregator(positionToAggregate, typeInfo, aggregationType, false);
-	}
-
-	public static <R> AggregationFunction<R> getAggregator(int positionToAggregate,
-			TypeInformation<R> typeInfo, AggregationType aggregationType, boolean first) {
-
-		if (typeInfo.isTupleType()) {
-			return new TupleComparableAggregator<R>(positionToAggregate, aggregationType, first);
-		} else if (typeInfo instanceof BasicArrayTypeInfo
-				|| typeInfo instanceof PrimitiveArrayTypeInfo) {
-			return new ArrayComparableAggregator<R>(positionToAggregate, aggregationType, first);
-		} else {
-			return new SimpleComparableAggregator<R>(aggregationType);
-		}
-	}
-
-	public static <R> AggregationFunction<R> getAggregator(String field,
-			TypeInformation<R> typeInfo, AggregationType aggregationType, boolean first) {
-
-		return new PojoComparableAggregator<R>(field, typeInfo, aggregationType, first);
-	}
-
-	private static class TupleComparableAggregator<T> extends ComparableAggregator<T> {
-
-		private static final long serialVersionUID = 1L;
-
-		public TupleComparableAggregator(int pos, AggregationType aggregationType, boolean first) {
-			super(pos, aggregationType, first);
-		}
-
-		@SuppressWarnings("unchecked")
-		@Override
-		public T reduce(T value1, T value2) throws Exception {
-			Tuple tuple1 = (Tuple) value1;
-			Tuple tuple2 = (Tuple) value2;
-
-			Comparable<Object> o1 = tuple1.getField(position);
-			Object o2 = tuple2.getField(position);
-
-			int c = comparator.isExtremal(o1, o2);
-
-			if (byAggregate) {
-				if (c == 1) {
-					return (T) tuple1;
-				}
-				if (first) {
-					if (c == 0) {
-						return (T) tuple1;
-					}
-				}
-
-				return (T) tuple2;
-
-			} else {
-				if (c == 1) {
-					tuple2.setField(o1, position);
-				}
-				return (T) tuple2;
-			}
-
-		}
-	}
-
-	private static class ArrayComparableAggregator<T> extends ComparableAggregator<T> {
-
-		private static final long serialVersionUID = 1L;
-
-		public ArrayComparableAggregator(int pos, AggregationType aggregationType, boolean first) {
-			super(pos, aggregationType, first);
-		}
-
-		@SuppressWarnings("unchecked")
-		@Override
-		public T reduce(T array1, T array2) throws Exception {
-
-			Object v1 = Array.get(array1, position);
-			Object v2 = Array.get(array2, position);
-
-			int c = comparator.isExtremal((Comparable<Object>) v1, v2);
-
-			if (byAggregate) {
-				if (c == 1) {
-					return array1;
-				}
-				if (first) {
-					if (c == 0) {
-						return array1;
-					}
-				}
-
-				return array2;
-			} else {
-				if (c == 1) {
-					Array.set(array2, position, v1);
-				}
-
-				return array2;
-			}
-		}
-
-	}
-
-	private static class SimpleComparableAggregator<T> extends ComparableAggregator<T> {
-
-		private static final long serialVersionUID = 1L;
-
-		public SimpleComparableAggregator(AggregationType aggregationType) {
-			super(0, aggregationType, false);
-		}
-
-		@SuppressWarnings("unchecked")
-		@Override
-		public T reduce(T value1, T value2) throws Exception {
-
-			if (comparator.isExtremal((Comparable<Object>) value1, value2) == 1) {
-				return value1;
-			} else {
-				return value2;
-			}
-		}
-
-	}
-
-	private static class PojoComparableAggregator<T> extends ComparableAggregator<T> {
-
-		private static final long serialVersionUID = 1L;
-		PojoComparator<T> pojoComparator;
-
-		public PojoComparableAggregator(String field, TypeInformation<?> typeInfo,
-				AggregationType aggregationType, boolean first) {
-			super(0, aggregationType, first);
-			if (!(typeInfo instanceof CompositeType<?>)) {
-				throw new IllegalArgumentException(
-						"Key expressions are only supported on POJO types and Tuples. "
-								+ "A type is considered a POJO if all its fields are public, or have both getters and setters defined");
-			}
-
-			@SuppressWarnings("unchecked")
-			CompositeType<T> cType = (CompositeType<T>) typeInfo;
-
-			List<FlatFieldDescriptor> fieldDescriptors = cType.getFlatFields(field);
-			int logicalKeyPosition = fieldDescriptors.get(0).getPosition();
-
-			if (cType instanceof PojoTypeInfo) {
-				pojoComparator = (PojoComparator<T>) cType.createComparator(
-						new int[] { logicalKeyPosition }, new boolean[] { false }, 0);
-			} else {
-				throw new IllegalArgumentException(
-						"Key expressions are only supported on POJO types. "
-								+ "A type is considered a POJO if all its fields are public, or have both getters and setters defined");
-			}
-		}
-
-		@Override
-		public T reduce(T value1, T value2) throws Exception {
-
-			Field[] keyFields = pojoComparator.getKeyFields();
-			Object field1 = pojoComparator.accessField(keyFields[0], value1);
-			Object field2 = pojoComparator.accessField(keyFields[0], value2);
-
-			@SuppressWarnings("unchecked")
-			int c = comparator.isExtremal((Comparable<Object>) field1, field2);
-
-			if (byAggregate) {
-				if (c == 1) {
-					return value1;
-				}
-				if (first) {
-					if (c == 0) {
-						return value1;
-					}
-				}
-
-				return value2;
-			} else {
-				if (c == 1) {
-					keyFields[0].set(value2, field1);
-				} 
-				
-				return value2;
-			}
-		}
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/Comparator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/Comparator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/Comparator.java
deleted file mode 100644
index f56774b..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/Comparator.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.aggregation;
-
-import java.io.Serializable;
-
-import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType;
-
-public abstract class Comparator implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	public abstract <R> int isExtremal(Comparable<R> o1, R o2);
-
-	public static Comparator getForAggregation(AggregationType type) {
-		switch (type) {
-		case MAX:
-			return new MaxComparator();
-		case MIN:
-			return new MinComparator();
-		case MINBY:
-			return new MinByComparator();
-		case MAXBY:
-			return new MaxByComparator();
-		default:
-			throw new IllegalArgumentException("Unsupported aggregation type.");
-		}
-	}
-
-	private static class MaxComparator extends Comparator {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public <R> int isExtremal(Comparable<R> o1, R o2) {
-			return o1.compareTo(o2) > 0 ? 1 : 0;
-		}
-
-	}
-
-	private static class MaxByComparator extends Comparator {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public <R> int isExtremal(Comparable<R> o1, R o2) {
-			int c = o1.compareTo(o2);
-			if (c > 0) {
-				return 1;
-			}
-			if (c == 0) {
-				return 0;
-			} else {
-				return -1;
-			}
-		}
-
-	}
-
-	private static class MinByComparator extends Comparator {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public <R> int isExtremal(Comparable<R> o1, R o2) {
-			int c = o1.compareTo(o2);
-			if (c < 0) {
-				return 1;
-			}
-			if (c == 0) {
-				return 0;
-			} else {
-				return -1;
-			}
-		}
-
-	}
-
-	private static class MinComparator extends Comparator {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public <R> int isExtremal(Comparable<R> o1, R o2) {
-			return o1.compareTo(o2) < 0 ? 1 : 0;
-		}
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java
deleted file mode 100644
index 142028b..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.aggregation;
-
-import java.lang.reflect.Array;
-import java.lang.reflect.Field;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.PojoTypeInfo;
-import org.apache.flink.api.java.typeutils.runtime.PojoComparator;
-
-public abstract class SumAggregator {
-
-	public static <T> ReduceFunction<T> getSumFunction(int pos, Class<?> clazz,
-			TypeInformation<T> typeInfo) {
-
-		if (typeInfo.isTupleType()) {
-			return new TupleSumAggregator<T>(pos, SumFunction.getForClass(clazz));
-		} else if (typeInfo instanceof BasicArrayTypeInfo
-				|| typeInfo instanceof PrimitiveArrayTypeInfo) {
-			return new ArraySumAggregator<T>(pos, SumFunction.getForClass(clazz));
-		} else {
-			return new SimpleSumAggregator<T>(SumFunction.getForClass(clazz));
-		}
-
-	}
-
-	public static <T> ReduceFunction<T> getSumFunction(String field, TypeInformation<T> typeInfo) {
-
-		return new PojoSumAggregator<T>(field, typeInfo);
-	}
-
-	private static class TupleSumAggregator<T> extends AggregationFunction<T> {
-
-		private static final long serialVersionUID = 1L;
-
-		SumFunction adder;
-
-		public TupleSumAggregator(int pos, SumFunction adder) {
-			super(pos);
-			this.adder = adder;
-		}
-
-		@SuppressWarnings("unchecked")
-		@Override
-		public T reduce(T value1, T value2) throws Exception {
-
-			Tuple tuple1 = (Tuple) value1;
-			Tuple tuple2 = (Tuple) value2;
-
-			tuple2.setField(adder.add(tuple1.getField(position), tuple2.getField(position)),
-					position);
-
-			return (T) tuple2;
-		}
-
-	}
-
-	private static class ArraySumAggregator<T> extends AggregationFunction<T> {
-
-		private static final long serialVersionUID = 1L;
-
-		SumFunction adder;
-
-		public ArraySumAggregator(int pos, SumFunction adder) {
-			super(pos);
-			this.adder = adder;
-		}
-
-		@Override
-		public T reduce(T value1, T value2) throws Exception {
-
-			Object v1 = Array.get(value1, position);
-			Object v2 = Array.get(value2, position);
-			Array.set(value2, position, adder.add(v1, v2));
-			return value2;
-		}
-
-	}
-
-	private static class SimpleSumAggregator<T> extends AggregationFunction<T> {
-
-		private static final long serialVersionUID = 1L;
-
-		SumFunction adder;
-
-		public SimpleSumAggregator(SumFunction adder) {
-			super(0);
-			this.adder = adder;
-		}
-
-		@SuppressWarnings("unchecked")
-		@Override
-		public T reduce(T value1, T value2) throws Exception {
-
-			return (T) adder.add(value1, value2);
-		}
-
-	}
-
-	private static class PojoSumAggregator<T> extends AggregationFunction<T> {
-
-		private static final long serialVersionUID = 1L;
-		SumFunction adder;
-		PojoComparator<T> comparator;
-
-		public PojoSumAggregator(String field, TypeInformation<?> type) {
-			super(0);
-			if (!(type instanceof CompositeType<?>)) {
-				throw new IllegalArgumentException(
-						"Key expressions are only supported on POJO types and Tuples. "
-								+ "A type is considered a POJO if all its fields are public, or have both getters and setters defined");
-			}
-
-			@SuppressWarnings("unchecked")
-			CompositeType<T> cType = (CompositeType<T>) type;
-
-			List<FlatFieldDescriptor> fieldDescriptors = cType.getFlatFields(field);
-
-			int logicalKeyPosition = fieldDescriptors.get(0).getPosition();
-			Class<?> keyClass = fieldDescriptors.get(0).getType().getTypeClass();
-
-			adder = SumFunction.getForClass(keyClass);
-
-			if (cType instanceof PojoTypeInfo) {
-				comparator = (PojoComparator<T>) cType.createComparator(
-						new int[] { logicalKeyPosition }, new boolean[] { false }, 0);
-			} else {
-				throw new IllegalArgumentException(
-						"Key expressions are only supported on POJO types. "
-								+ "A type is considered a POJO if all its fields are public, or have both getters and setters defined");
-			}
-		}
-
-		@Override
-		public T reduce(T value1, T value2) throws Exception {
-
-			Field[] keyFields = comparator.getKeyFields();
-			Object field1 = comparator.accessField(keyFields[0], value1);
-			Object field2 = comparator.accessField(keyFields[0], value2);
-
-			keyFields[0].set(value2, adder.add(field1, field2));
-
-			return value2;
-		}
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumFunction.java
deleted file mode 100644
index 2aef19c..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumFunction.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.aggregation;
-
-import java.io.Serializable;
-
-public abstract class SumFunction implements Serializable{
-
-	private static final long serialVersionUID = 1L;
-
-	public abstract Object add(Object o1, Object o2);
-
-	public static SumFunction getForClass(Class<?> clazz) {
-
-		if (clazz == Integer.class) {
-			return new IntSum();
-		} else if (clazz == Long.class) {
-			return new LongSum();
-		} else if (clazz == Short.class) {
-			return new ShortSum();
-		} else if (clazz == Double.class) {
-			return new DoubleSum();
-		} else if (clazz == Float.class) {
-			return new FloatSum();
-		} else if (clazz == Byte.class) {
-			return new ByteSum();
-		} else {
-			throw new RuntimeException("DataStream cannot be summed because the class "
-					+ clazz.getSimpleName() + " does not support the + operator.");
-		}
-	}
-
-	public static class IntSum extends SumFunction {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Object add(Object value1, Object value2) {
-			return (Integer) value1 + (Integer) value2;
-		}
-	}
-
-	public static class LongSum extends SumFunction {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Object add(Object value1, Object value2) {
-			return (Long) value1 + (Long) value2;
-		}
-	}
-
-	public static class DoubleSum extends SumFunction {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Object add(Object value1, Object value2) {
-			return (Double) value1 + (Double) value2;
-		}
-	}
-
-	public static class ShortSum extends SumFunction {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Object add(Object value1, Object value2) {
-			return (Short) value1 + (Short) value2;
-		}
-	}
-
-	public static class FloatSum extends SumFunction {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Object add(Object value1, Object value2) {
-			return (Float) value1 + (Float) value2;
-		}
-	}
-
-	public static class ByteSum extends SumFunction {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Object add(Object value1, Object value2) {
-			return (Byte) value1 + (Byte) value2;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoFlatMapFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoFlatMapFunction.java
deleted file mode 100644
index 10e8bac..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoFlatMapFunction.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.co;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.util.Collector;
-
-/**
- * A CoFlatMapFunction represents a FlatMap transformation with two different
- * input types.
- *
- * @param <IN1>
- *            Type of the first input.
- * @param <IN2>
- *            Type of the second input.
- * @param <OUT>
- *            Output type.
- */
-public interface CoFlatMapFunction<IN1, IN2, OUT> extends Function, Serializable {
-
-	public void flatMap1(IN1 value, Collector<OUT> out) throws Exception;
-
-	public void flatMap2(IN2 value, Collector<OUT> out) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java
deleted file mode 100644
index 2903828..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.co;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.common.functions.Function;
-
-/**
- * A CoMapFunction represents a Map transformation with two different input
- * types.
- *
- * @param <IN1>
- *            Type of the first input.
- * @param <IN2>
- *            Type of the second input.
- * @param <OUT>
- *            Output type.
- */
-public interface CoMapFunction<IN1, IN2, OUT> extends Function, Serializable {
-
-	public OUT map1(IN1 value);
-
-	public OUT map2(IN2 value);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoReduceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoReduceFunction.java
deleted file mode 100644
index 879a1b4..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoReduceFunction.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.co;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.common.functions.Function;
-
-/**
- * The CoReduceFunction interface represents a Reduce transformation with two
- * different input streams. The reduce1 function combine groups of elements of
- * the first input with the same key to a single value, while reduce2 combine
- * groups of elements of the second input with the same key to a single value.
- * Each produced values are mapped to the same type by map1 and map2,
- * respectively, to form one output stream.
- * 
- * The basic syntax for using a grouped ReduceFunction is as follows:
- * 
- * <pre>
- * <blockquote>
- * ConnectedDataStream<X> input = ...;
- * 
- * ConnectedDataStream<X> result = input.groupBy(keyPosition1, keyPosition2)
- *          .reduce(new MyCoReduceFunction(), keyPosition1, keyPosition2).addSink(...);
- * </blockquote>
- * </pre>
- * <p>
- * 
- * @param <IN1>
- *            Type of the first input.
- * @param <IN2>
- *            Type of the second input.
- * @param <OUT>
- *            Output type.
- */
-public interface CoReduceFunction<IN1, IN2, OUT> extends Function, Serializable {
-
-	/**
-	 * The core method of CoReduceFunction, combining two values of the first
-	 * input into one value of the same type. The reduce1 function is
-	 * consecutively applied to all values of a group until only a single value
-	 * remains.
-	 *
-	 * @param value1
-	 *            The first value to combine.
-	 * @param value2
-	 *            The second value to combine.
-	 * @return The combined value of both input values.
-	 *
-	 * @throws Exception
-	 *             This method may throw exceptions. Throwing an exception will
-	 *             cause the operation to fail and may trigger recovery.
-	 */
-	public IN1 reduce1(IN1 value1, IN1 value2);
-
-	/**
-	 * The core method of ReduceFunction, combining two values of the second
-	 * input into one value of the same type. The reduce2 function is
-	 * consecutively applied to all values of a group until only a single value
-	 * remains.
-	 *
-	 * @param value1
-	 *            The first value to combine.
-	 * @param value2
-	 *            The second value to combine.
-	 * @return The combined value of both input values.
-	 *
-	 * @throws Exception
-	 *             This method may throw exceptions. Throwing an exception will
-	 *             cause the operation to fail and may trigger recovery.
-	 */
-	public IN2 reduce2(IN2 value1, IN2 value2);
-
-	/**
-	 * Maps the reduced first input to the output type.
-	 * 
-	 * @param value
-	 *            Type of the first input.
-	 * @return the output type.
-	 */
-	public OUT map1(IN1 value);
-
-	/**
-	 * Maps the reduced second input to the output type.
-	 * 
-	 * @param value
-	 *            Type of the second input.
-	 * @return the output type.
-	 */
-	public OUT map2(IN2 value);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoWindowFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoWindowFunction.java
deleted file mode 100644
index 2f2514f..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoWindowFunction.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.co;
-
-import java.io.Serializable;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.util.Collector;
-
-public interface CoWindowFunction<IN1, IN2, O> extends Function, Serializable {
-
-	public void coWindow(List<IN1> first, List<IN2> second, Collector<O> out) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CrossWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CrossWindowFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CrossWindowFunction.java
deleted file mode 100644
index 9cafcd1..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CrossWindowFunction.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.co;
-
-import java.util.List;
-
-import org.apache.flink.api.common.functions.CrossFunction;
-import org.apache.flink.util.Collector;
-
-public class CrossWindowFunction<IN1, IN2, OUT> implements CoWindowFunction<IN1, IN2, OUT> {
-	private static final long serialVersionUID = 1L;
-
-	private CrossFunction<IN1, IN2, OUT> crossFunction;
-
-	public CrossWindowFunction(CrossFunction<IN1, IN2, OUT> crossFunction) {
-		this.crossFunction = crossFunction;
-	}
-
-	@Override
-	public void coWindow(List<IN1> first, List<IN2> second, Collector<OUT> out) throws Exception {
-		for (IN1 firstValue : first) {
-			for (IN2 secondValue : second) {
-				out.collect(crossFunction.cross(firstValue, secondValue));
-			}
-		}
-	}
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
deleted file mode 100644
index 9b39f33..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.co;
-
-import java.util.List;
-
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.util.Collector;
-
-public class JoinWindowFunction<IN1, IN2, OUT> implements CoWindowFunction<IN1, IN2, OUT> {
-	private static final long serialVersionUID = 1L;
-
-	private KeySelector<IN1, ?> keySelector1;
-	private KeySelector<IN2, ?> keySelector2;
-	private JoinFunction<IN1, IN2, OUT> joinFunction;
-
-	public JoinWindowFunction(KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2,
-			JoinFunction<IN1, IN2, OUT> joinFunction) {
-		this.keySelector1 = keySelector1;
-		this.keySelector2 = keySelector2;
-		this.joinFunction = joinFunction;
-	}
-
-	@Override
-	public void coWindow(List<IN1> first, List<IN2> second, Collector<OUT> out) throws Exception {
-		for (IN1 item1 : first) {
-			Object key1 = keySelector1.getKey(item1);
-
-			for (IN2 item2 : second) {
-				Object key2 = keySelector2.getKey(item2);
-
-				if (key1.equals(key2)) {
-					out.collect(joinFunction.join(item1, item2));
-				}
-			}
-		}
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoFlatMapFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoFlatMapFunction.java
deleted file mode 100644
index 2458f1b..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoFlatMapFunction.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.co;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.RichFunction;
-
-/**
- * A RichCoFlatMapFunction represents a FlatMap transformation with two different input
- * types. In addition to that the user can use the features provided by the
- * {@link RichFunction} interface.
- *
- * @param <IN1>
- *            Type of the first input.
- * @param <IN2>
- *            Type of the second input.
- * @param <OUT>
- *            Output type.
- */
-public abstract class RichCoFlatMapFunction<IN1, IN2, OUT> extends AbstractRichFunction implements
-		CoFlatMapFunction<IN1, IN2, OUT> {
-
-	private static final long serialVersionUID = 1L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoMapFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoMapFunction.java
deleted file mode 100755
index 20d520c..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoMapFunction.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.co;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.RichFunction;
-
-/**
- * A RichCoMapFunction represents a Map transformation with two different input
- * types. In addition to that the user can use the features provided by the
- * {@link RichFunction} interface.
- *
- * @param <IN1>
- *            Type of the first input.
- * @param <IN2>
- *            Type of the second input.
- * @param <OUT>
- *            Output type.
- */
-public abstract class RichCoMapFunction<IN1, IN2, OUT> extends AbstractRichFunction implements
-		CoMapFunction<IN1, IN2, OUT> {
-
-	private static final long serialVersionUID = 1L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoReduceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoReduceFunction.java
deleted file mode 100644
index 655923f..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoReduceFunction.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.co;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.RichFunction;
-
-/**
- * A RichCoReduceFunction represents a Reduce transformation with two different input
- * types. In addition to that the user can use the features provided by the
- * {@link RichFunction} interface.
- *
- * @param <IN1>
- *            Type of the first input.
- * @param <IN2>
- *            Type of the second input.
- * @param <OUT>
- *            Output type.
- */
-public abstract class RichCoReduceFunction<IN1, IN2, OUT> extends AbstractRichFunction implements
-		CoReduceFunction<IN1, IN2, OUT> {
-
-	private static final long serialVersionUID = 1L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoWindowFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoWindowFunction.java
deleted file mode 100644
index 2709203..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoWindowFunction.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.co;
-
-import java.util.List;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.util.Collector;
-
-public abstract class RichCoWindowFunction<IN1, IN2, O> extends AbstractRichFunction implements
-		CoWindowFunction<IN1, IN2, O> {
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public abstract void coWindow(List<IN1> first, List<IN2> second, Collector<O> out)
-			throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
deleted file mode 100644
index 24beba1..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.sink;
-
-import java.io.IOException;
-import java.util.ArrayList;
-
-import org.apache.flink.api.common.io.CleanupWhenUnsuccessful;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Simple implementation of the SinkFunction writing tuples in the specified
- * OutputFormat format. Tuples are collected to a list and written to the file
- * periodically. The target path and the overwrite mode are pre-packaged in
- * format.
- * 
- * @param <IN>
- *            Input type
- */
-public abstract class FileSinkFunction<IN> extends RichSinkFunction<IN> {
-	private static final long serialVersionUID = 1L;
-	private static final Logger LOG = LoggerFactory.getLogger(FileSinkFunction.class);
-	protected ArrayList<IN> tupleList = new ArrayList<IN>();
-	protected volatile OutputFormat<IN> format;
-	protected volatile boolean cleanupCalled = false;
-	protected int indexInSubtaskGroup;
-	protected int currentNumberOfSubtasks;
-
-	public FileSinkFunction(OutputFormat<IN> format) {
-		this.format = format;
-	}
-
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
-		format.configure(context.getTaskStubParameters());
-		indexInSubtaskGroup = context.getIndexOfThisSubtask();
-		currentNumberOfSubtasks = context.getNumberOfParallelSubtasks();
-		format.open(indexInSubtaskGroup, currentNumberOfSubtasks);
-	}
-
-	@Override
-	public void invoke(IN record) throws Exception {
-		tupleList.add(record);
-		if (updateCondition()) {
-			flush();
-		}
-	}
-
-	@Override
-	public void close() throws IOException {
-		if (!tupleList.isEmpty()) {
-			flush();
-		}
-		try {
-			format.close();
-		} catch (Exception ex) {
-			try {
-				if (!cleanupCalled && format instanceof CleanupWhenUnsuccessful) {
-					cleanupCalled = true;
-					((CleanupWhenUnsuccessful) format).tryCleanupOnError();
-				}
-			} catch (Throwable t) {
-				LOG.error("Cleanup on error failed.", t);
-			}
-		}
-	}
-
-	protected void flush() {
-		try {
-			for (IN rec : tupleList) {
-				format.writeRecord(rec);
-			}
-		} catch (Exception ex) {
-			try {
-				if (!cleanupCalled && format instanceof CleanupWhenUnsuccessful) {
-					cleanupCalled = true;
-					((CleanupWhenUnsuccessful) format).tryCleanupOnError();
-				}
-			} catch (Throwable t) {
-				LOG.error("Cleanup on error failed.", t);
-			}
-		}
-		resetParameters();
-	}
-
-	/**
-	 * Condition for writing the contents of tupleList and clearing it.
-	 * 
-	 * @return value of the updating condition
-	 */
-	protected abstract boolean updateCondition();
-
-	/**
-	 * Statements to be executed after writing a batch goes here.
-	 */
-	protected abstract void resetParameters();
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunctionByMillis.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunctionByMillis.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunctionByMillis.java
deleted file mode 100644
index f049a32..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunctionByMillis.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.sink;
-
-import org.apache.flink.api.common.io.OutputFormat;
-
-/**
- * Implementation of FileSinkFunction. Writes tuples to file in every millis
- * milliseconds.
- * 
- * @param <IN>
- *            Input type
- */
-public class FileSinkFunctionByMillis<IN> extends FileSinkFunction<IN> {
-	private static final long serialVersionUID = 1L;
-
-	private final long millis;
-	private long lastTime;
-
-	public FileSinkFunctionByMillis(OutputFormat<IN> format, long millis) {
-		super(format);
-		this.millis = millis;
-		lastTime = System.currentTimeMillis();
-	}
-
-	/**
-	 * Condition for writing the contents of tupleList and clearing it.
-	 * 
-	 * @return value of the updating condition
-	 */
-	@Override
-	protected boolean updateCondition() {
-		return System.currentTimeMillis() - lastTime >= millis;
-	}
-
-	/**
-	 * Statements to be executed after writing a batch goes here.
-	 */
-	@Override
-	protected void resetParameters() {
-		tupleList.clear();
-		lastTime = System.currentTimeMillis();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
deleted file mode 100755
index d460749..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.sink;
-
-import java.io.PrintStream;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
-
-/**
- * Implementation of the SinkFunction writing every tuple to the standard
- * output or standard error stream.
- * 
- * @param <IN>
- *            Input record type
- */
-public class PrintSinkFunction<IN> extends RichSinkFunction<IN> {
-	private static final long serialVersionUID = 1L;
-
-	private static final boolean STD_OUT = false;
-	private static final boolean STD_ERR = true;
-	
-	private boolean target; 
-	private transient PrintStream stream;
-	private transient String prefix;
-	
-	/**
-	 * Instantiates a print sink function that prints to standard out.
-	 */
-	public PrintSinkFunction() {}
-	
-	/**
-	 * Instantiates a print sink function that prints to standard out.
-	 * 
-	 * @param stdErr True, if the format should print to standard error instead of standard out.
-	 */
-	public PrintSinkFunction(boolean stdErr) {
-		target = stdErr;
-	}
-
-	public void setTargetToStandardOut() {
-		target = STD_OUT;
-	}
-	
-	public void setTargetToStandardErr() {
-		target = STD_ERR;
-	}
-	
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		super.open(parameters);
-		StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
-		// get the target stream
-		stream = target == STD_OUT ? System.out : System.err;
-		
-		// set the prefix if we have a >1 DOP
-		prefix = (context.getNumberOfParallelSubtasks() > 1) ? 
-				((context.getIndexOfThisSubtask() + 1) + "> ") : null;
-	}
-
-	@Override
-	public void invoke(IN record) {
-		if (prefix != null) {
-			stream.println(prefix + record.toString());
-		}
-		else {
-			stream.println(record.toString());
-		}
-	}
-	
-	@Override
-	public void close() throws Exception {
-		this.stream = null;
-		this.prefix = null;
-		super.close();
-	}
-	
-	@Override
-	public String toString() {
-		return "Print to " + (target == STD_OUT ? "System.out" : "System.err");
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/RichSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/RichSinkFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/RichSinkFunction.java
deleted file mode 100755
index 3b8a4db..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/RichSinkFunction.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.sink;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-
-public abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN> {
-
-	private static final long serialVersionUID = 1L;
-
-	public abstract void invoke(IN value) throws Exception;
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
deleted file mode 100644
index 6097603..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.sink;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.common.functions.Function;
-
-public interface SinkFunction<IN> extends Function, Serializable {
-
-	public abstract void invoke(IN value) throws Exception;
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormat.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormat.java
deleted file mode 100644
index a606302..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormat.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.sink;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-
-/**
- * Abstract class for formatting the output of the writeAsText and writeAsCsv
- * functions.
- *
- * @param <IN>
- *            Input tuple type
- */
-public abstract class WriteFormat<IN> implements Serializable {
-	private static final long serialVersionUID = 1L;
-
-	/**
-	 * Writes the contents of tupleList to the file specified by path.
-	 * 
-	 * @param path
-	 *            is the path to the location where the tuples are written
-	 * @param tupleList
-	 *            is the list of tuples to be written
-	 */
-	protected abstract void write(String path, ArrayList<IN> tupleList);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java
deleted file mode 100644
index b22fd80..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.sink;
-
-import java.io.BufferedWriter;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-
-/**
- * Writes tuples in csv format.
- * 
- * @param <IN>
- *            Input tuple type
- */
-public class WriteFormatAsCsv<IN> extends WriteFormat<IN> {
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	protected void write(String path, ArrayList<IN> tupleList) {
-		try {
-			PrintWriter outStream = new PrintWriter(new BufferedWriter(new FileWriter(path, true)));
-			for (IN tupleToWrite : tupleList) {
-				String strTuple = tupleToWrite.toString();
-				outStream.println(strTuple.substring(1, strTuple.length() - 1));
-			}
-			outStream.close();
-		} catch (IOException e) {
-			throw new RuntimeException("Exception occured while writing file " + path, e);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsText.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsText.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsText.java
deleted file mode 100644
index 5891104..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsText.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.sink;
-
-import java.io.BufferedWriter;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-
-/**
- * Writes tuples in text format.
- *
- * @param <IN>
- *            Input tuple type
- */
-public class WriteFormatAsText<IN> extends WriteFormat<IN> {
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public void write(String path, ArrayList<IN> tupleList) {
-		try {
-			PrintWriter outStream = new PrintWriter(new BufferedWriter(new FileWriter(path, true)));
-			for (IN tupleToWrite : tupleList) {
-				outStream.println(tupleToWrite);
-			}
-			outStream.close();
-		} catch (IOException e) {
-			throw new RuntimeException("Exception occured while writing file " + path, e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java
deleted file mode 100644
index 0c52afc..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.sink;
-
-import java.io.FileNotFoundException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-
-/**
- * Simple implementation of the SinkFunction writing tuples as simple text to
- * the file specified by path. Tuples are collected to a list and written to the
- * file periodically. The file specified by path is created if it does not
- * exist, cleared if it exists before the writing.
- * 
- * @param <IN>
- *            Input tuple type
- */
-public abstract class WriteSinkFunction<IN> implements SinkFunction<IN> {
-	private static final long serialVersionUID = 1L;
-
-	protected final String path;
-	protected ArrayList<IN> tupleList = new ArrayList<IN>();
-	protected WriteFormat<IN> format;
-
-	public WriteSinkFunction(String path, WriteFormat<IN> format) {
-		this.path = path;
-		this.format = format;
-		cleanFile(path);
-	}
-
-	/**
-	 * Creates target file if it does not exist, cleans it if it exists.
-	 * 
-	 * @param path
-	 *            is the path to the location where the tuples are written
-	 */
-	protected void cleanFile(String path) {
-		try {
-			PrintWriter writer;
-			writer = new PrintWriter(path);
-			writer.print("");
-			writer.close();
-		} catch (FileNotFoundException e) {
-			throw new RuntimeException("File not found " + path, e);
-		}
-	}
-
-	/**
-	 * Condition for writing the contents of tupleList and clearing it.
-	 * 
-	 * @return value of the updating condition
-	 */
-	protected abstract boolean updateCondition();
-
-	/**
-	 * Statements to be executed after writing a batch goes here.
-	 */
-	protected abstract void resetParameters();
-
-	/**
-	 * Implementation of the invoke method of the SinkFunction class. Collects
-	 * the incoming tuples in tupleList and appends the list to the end of the
-	 * target file if updateCondition() is true or the current tuple is the
-	 * endTuple.
-	 */
-	@Override
-	public void invoke(IN tuple) {
-
-		tupleList.add(tuple);
-		if (updateCondition()) {
-			format.write(path, tupleList);
-			resetParameters();
-		}
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
deleted file mode 100644
index ee6df94..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.sink;
-
-/**
- * Implementation of WriteSinkFunction. Writes tuples to file in every millis
- * milliseconds.
- * 
- * @param <IN>
- *            Input tuple type
- */
-public class WriteSinkFunctionByMillis<IN> extends WriteSinkFunction<IN> {
-	private static final long serialVersionUID = 1L;
-
-	private final long millis;
-	private long lastTime;
-
-	public WriteSinkFunctionByMillis(String path, WriteFormat<IN> format, long millis) {
-		super(path, format);
-		this.millis = millis;
-		lastTime = System.currentTimeMillis();
-	}
-
-	@Override
-	protected boolean updateCondition() {
-		return System.currentTimeMillis() - lastTime >= millis;
-	}
-
-	@Override
-	protected void resetParameters() {
-		tupleList.clear();
-		lastTime = System.currentTimeMillis();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java
deleted file mode 100644
index 05a2489..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.source;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Long, Long>> {
-	private static final long serialVersionUID = 1L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(FileMonitoringFunction.class);
-
-	public enum WatchType {
-		ONLY_NEW_FILES, // Only new files will be processed.
-		REPROCESS_WITH_APPENDED, // When some files are appended, all contents of the files will be processed.
-		PROCESS_ONLY_APPENDED // When some files are appended, only appended contents will be processed.
-	}
-
-	private String path;
-	private long interval;
-	private WatchType watchType;
-
-	private FileSystem fileSystem;
-	private Map<String, Long> offsetOfFiles;
-	private Map<String, Long> modificationTimes;
-
-	public FileMonitoringFunction(String path, long interval, WatchType watchType) {
-		this.path = path;
-		this.interval = interval;
-		this.watchType = watchType;
-		this.modificationTimes = new HashMap<String, Long>();
-		this.offsetOfFiles = new HashMap<String, Long>();
-	}
-
-	@Override
-	public void invoke(Collector<Tuple3<String, Long, Long>> collector) throws Exception {
-		fileSystem = FileSystem.get(new URI(path));
-
-		while (true) {
-			List<String> files = listNewFiles();
-			for (String filePath : files) {
-				if (watchType == WatchType.ONLY_NEW_FILES
-						|| watchType == WatchType.REPROCESS_WITH_APPENDED) {
-					collector.collect(new Tuple3<String, Long, Long>(filePath, 0L, -1L));
-					offsetOfFiles.put(filePath, -1L);
-				} else if (watchType == WatchType.PROCESS_ONLY_APPENDED) {
-					long offset = 0;
-					long fileSize = fileSystem.getFileStatus(new Path(filePath)).getLen();
-					if (offsetOfFiles.containsKey(filePath)) {
-						offset = offsetOfFiles.get(filePath);
-					}
-
-					collector.collect(new Tuple3<String, Long, Long>(filePath, offset, fileSize));
-					offsetOfFiles.put(filePath, fileSize);
-
-					LOG.info("File processed: {}, {}, {}", filePath, offset, fileSize);
-				}
-			}
-
-			Thread.sleep(interval);
-		}
-	}
-
-	private List<String> listNewFiles() throws IOException {
-		List<String> files = new ArrayList<String>();
-
-		FileStatus[] statuses = fileSystem.listStatus(new Path(path));
-
-		for (FileStatus status : statuses) {
-			Path filePath = status.getPath();
-			String fileName = filePath.getName();
-			long modificationTime = status.getModificationTime();
-
-			if (!isFiltered(fileName, modificationTime)) {
-				files.add(filePath.toString());
-				modificationTimes.put(fileName, modificationTime);
-			}
-		}
-		return files;
-	}
-
-	private boolean isFiltered(String fileName, long modificationTime) {
-
-		if ((watchType == WatchType.ONLY_NEW_FILES && modificationTimes.containsKey(fileName))
-				|| fileName.startsWith(".") || fileName.contains("_COPYING_")) {
-			return true;
-		} else {
-			Long lastModification = modificationTimes.get(fileName);
-			if (lastModification == null) {
-				return false;
-			} else {
-				return lastModification >= modificationTime;
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileReadFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileReadFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileReadFunction.java
deleted file mode 100644
index 0882d9e..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileReadFunction.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.source;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.util.Collector;
-
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.net.URI;
-
-public class FileReadFunction implements FlatMapFunction<Tuple3<String, Long, Long>, String> {
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public void flatMap(Tuple3<String, Long, Long> value, Collector<String> out) throws Exception {
-		FSDataInputStream stream = FileSystem.get(new URI(value.f0)).open(new Path(value.f0));
-		stream.seek(value.f1);
-
-		BufferedReader reader = new BufferedReader(new InputStreamReader(stream));
-		String line;
-
-		try {
-			while ((line = reader.readLine()) != null && (value.f2 == -1L || stream.getPos() <= value.f2)) {
-				out.collect(line);
-			}
-		} finally {
-			reader.close();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
deleted file mode 100644
index 5dfe4b2..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.source;
-
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory;
-import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
-import org.apache.flink.util.Collector;
-
-public class FileSourceFunction extends RichSourceFunction<String> {
-	private static final long serialVersionUID = 1L;
-
-	private InputSplitProvider provider;
-
-	private InputFormat<String, ?> inputFormat;
-
-	private TypeSerializerFactory<String> serializerFactory;
-
-	public FileSourceFunction(InputFormat<String, ?> format, TypeInformation<String> typeInfo) {
-		this.inputFormat = format;
-		this.serializerFactory = createSerializer(typeInfo);
-	}
-
-	private static TypeSerializerFactory<String> createSerializer(TypeInformation<String> typeInfo) {
-		TypeSerializer<String> serializer = typeInfo.createSerializer();
-
-		if (serializer.isStateful()) {
-			return new RuntimeStatefulSerializerFactory<String>(serializer, typeInfo.getTypeClass());
-		} else {
-			return new RuntimeStatelessSerializerFactory<String>(serializer,
-					typeInfo.getTypeClass());
-		}
-	}
-
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
-		this.provider = context.getInputSplitProvider();
-		inputFormat.configure(context.getTaskStubParameters());
-	}
-
-	@Override
-	public void invoke(Collector<String> collector) throws Exception {
-		final TypeSerializer<String> serializer = serializerFactory.getSerializer();
-		final Iterator<InputSplit> splitIterator = getInputSplits();
-		@SuppressWarnings("unchecked")
-		final InputFormat<String, InputSplit> format = (InputFormat<String, InputSplit>) this.inputFormat;
-		try {
-			while (splitIterator.hasNext()) {
-
-				final InputSplit split = splitIterator.next();
-				String record = serializer.createInstance();
-
-				format.open(split);
-				try {
-					while (!format.reachedEnd()) {
-						if ((record = format.nextRecord(record)) != null) {
-							collector.collect(record);
-						}
-					}
-				} finally {
-					format.close();
-				}
-			}
-			collector.close();
-		} catch (Exception ex) {
-			ex.printStackTrace();
-		}
-	}
-
-	private Iterator<InputSplit> getInputSplits() {
-
-		return new Iterator<InputSplit>() {
-
-			private InputSplit nextSplit;
-
-			private boolean exhausted;
-
-			@Override
-			public boolean hasNext() {
-				if (exhausted) {
-					return false;
-				}
-
-				if (nextSplit != null) {
-					return true;
-				}
-
-				InputSplit split = provider.getNextInputSplit();
-
-				if (split != null) {
-					this.nextSplit = split;
-					return true;
-				} else {
-					exhausted = true;
-					return false;
-				}
-			}
-
-			@Override
-			public InputSplit next() {
-				if (this.nextSplit == null && !hasNext()) {
-					throw new NoSuchElementException();
-				}
-
-				final InputSplit tmp = this.nextSplit;
-				this.nextSplit = null;
-				return tmp;
-			}
-
-			@Override
-			public void remove() {
-				throw new UnsupportedOperationException();
-			}
-		};
-	}
-}


Mime
View raw message