flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [36/52] [partial] flink git commit: [FLINK-1452] Rename 'flink-addons' to 'flink-staging'
Date Mon, 02 Feb 2015 18:42:14 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
deleted file mode 100644
index ed246c8..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import java.util.Iterator;
-import java.util.LinkedList;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-
-public class WindowReduceInvokable<IN> extends WindowInvokable<IN, IN> {
-
-	private static final long serialVersionUID = 1L;
-
-	ReduceFunction<IN> reducer;
-
-	public WindowReduceInvokable(ReduceFunction<IN> userFunction,
-			LinkedList<TriggerPolicy<IN>> triggerPolicies,
-			LinkedList<EvictionPolicy<IN>> evictionPolicies) {
-		super(userFunction, triggerPolicies, evictionPolicies);
-		this.reducer = userFunction;
-	}
-
-	@Override
-	protected void callUserFunction() throws Exception {
-		Iterator<IN> reducedIterator = buffer.iterator();
-		IN reduced = null;
-
-		while (reducedIterator.hasNext() && reduced == null) {
-			reduced = reducedIterator.next();
-		}
-
-		while (reducedIterator.hasNext()) {
-			IN next = reducedIterator.next();
-			if (next != null) {
-				reduced = reducer.reduce(copy(reduced), copy(next));
-			}
-		}
-		if (reduced != null) {
-			collector.collect(reduced);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapInvokable.java
deleted file mode 100644
index 4cbaebb..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapInvokable.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.co;
-
-import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
-
-public class CoFlatMapInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
-	private static final long serialVersionUID = 1L;
-
-	private CoFlatMapFunction<IN1, IN2, OUT> flatMapper;
-
-	public CoFlatMapInvokable(CoFlatMapFunction<IN1, IN2, OUT> flatMapper) {
-		super(flatMapper);
-		this.flatMapper = flatMapper;
-	}
-
-	@Override
-	public void handleStream1() throws Exception {
-		callUserFunctionAndLogException1();
-	}
-
-	@Override
-	public void handleStream2() throws Exception {
-		callUserFunctionAndLogException2();
-	}
-
-	@Override
-	protected void callUserFunction1() throws Exception {
-		flatMapper.flatMap1(reuse1.getObject(), collector);
-
-	}
-
-	@Override
-	protected void callUserFunction2() throws Exception {
-		flatMapper.flatMap2(reuse2.getObject(), collector);
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedReduceInvokable.java
deleted file mode 100644
index 4907ac5..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedReduceInvokable.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.co;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-
-public class CoGroupedReduceInvokable<IN1, IN2, OUT> extends CoReduceInvokable<IN1, IN2, OUT> {
-	private static final long serialVersionUID = 1L;
-
-	protected KeySelector<IN1, ?> keySelector1;
-	protected KeySelector<IN2, ?> keySelector2;
-	private Map<Object, IN1> values1;
-	private Map<Object, IN2> values2;
-	IN1 reduced1;
-	IN2 reduced2;
-
-	public CoGroupedReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer,
-			KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2) {
-		super(coReducer);
-		this.coReducer = coReducer;
-		this.keySelector1 = keySelector1;
-		this.keySelector2 = keySelector2;
-		values1 = new HashMap<Object, IN1>();
-		values2 = new HashMap<Object, IN2>();
-	}
-
-	@Override
-	public void handleStream1() throws Exception {
-		Object key = reuse1.getKey(keySelector1);
-		currentValue1 = values1.get(key);
-		nextValue1 = reuse1.getObject();
-		if (currentValue1 != null) {
-			callUserFunctionAndLogException1();
-			values1.put(key, reduced1);
-			collector.collect(coReducer.map1(reduced1));
-		} else {
-			values1.put(key, nextValue1);
-			collector.collect(coReducer.map1(nextValue1));
-		}
-	}
-
-	@Override
-	public void handleStream2() throws Exception {
-		Object key = reuse2.getKey(keySelector2);
-		currentValue2 = values2.get(key);
-		nextValue2 = reuse2.getObject();
-		if (currentValue2 != null) {
-			callUserFunctionAndLogException2();
-			values2.put(key, reduced2);
-			collector.collect(coReducer.map2(reduced2));
-		} else {
-			values2.put(key, nextValue2);
-			collector.collect(coReducer.map2(nextValue2));
-		}
-	}
-
-	@Override
-	protected void callUserFunction1() throws Exception {
-		reduced1 = coReducer.reduce1(currentValue1, nextValue1);
-
-	}
-
-	@Override
-	protected void callUserFunction2() throws Exception {
-		reduced2 = coReducer.reduce2(currentValue2, nextValue2);
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
deleted file mode 100644
index 604873e..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.co;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.api.streamvertex.StreamTaskContext;
-import org.apache.flink.streaming.io.CoReaderIterator;
-import org.apache.flink.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<IN1, OUT> {
-
-	public CoInvokable(Function userFunction) {
-		super(userFunction);
-	}
-
-	private static final long serialVersionUID = 1L;
-	private static final Logger LOG = LoggerFactory.getLogger(CoInvokable.class);
-
-	protected CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> recordIterator;
-	protected StreamRecord<IN1> reuse1;
-	protected StreamRecord<IN2> reuse2;
-	protected StreamRecordSerializer<IN1> srSerializer1;
-	protected StreamRecordSerializer<IN2> srSerializer2;
-	protected TypeSerializer<IN1> serializer1;
-	protected TypeSerializer<IN2> serializer2;
-
-	@Override
-	public void setup(StreamTaskContext<OUT> taskContext) {
-		this.collector = taskContext.getOutputCollector();
-
-		this.recordIterator = taskContext.getCoReader();
-
-		this.srSerializer1 = taskContext.getInputSerializer(0);
-		this.srSerializer2 = taskContext.getInputSerializer(1);
-
-		this.reuse1 = srSerializer1.createInstance();
-		this.reuse2 = srSerializer2.createInstance();
-
-		this.serializer1 = srSerializer1.getObjectSerializer();
-		this.serializer2 = srSerializer2.getObjectSerializer();
-	}
-
-	protected void resetReuseAll() {
-		this.reuse1 = srSerializer1.createInstance();
-		this.reuse2 = srSerializer2.createInstance();
-	}
-
-	protected void resetReuse1() {
-		this.reuse1 = srSerializer1.createInstance();
-	}
-
-	protected void resetReuse2() {
-		this.reuse2 = srSerializer2.createInstance();
-	}
-
-	@Override
-	public void invoke() throws Exception {
-		while (true) {
-			int next = recordIterator.next(reuse1, reuse2);
-			if (next == 0) {
-				break;
-			} else if (next == 1) {
-				initialize1();
-				handleStream1();
-				resetReuse1();
-			} else {
-				initialize2();
-				handleStream2();
-				resetReuse2();
-			}
-		}
-	}
-
-	protected abstract void handleStream1() throws Exception;
-
-	protected abstract void handleStream2() throws Exception;
-
-	protected abstract void callUserFunction1() throws Exception;
-
-	protected abstract void callUserFunction2() throws Exception;
-
-	protected void initialize1() {
-
-	};
-
-	protected void initialize2() {
-
-	};
-
-	protected void callUserFunctionAndLogException1() {
-		try {
-			callUserFunction1();
-		} catch (Exception e) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Calling user function failed due to: {}",
-						StringUtils.stringifyException(e));
-			}
-		}
-	}
-
-	protected void callUserFunctionAndLogException2() {
-		try {
-			callUserFunction2();
-		} catch (Exception e) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Calling user function failed due to: {}",
-						StringUtils.stringifyException(e));
-			}
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
deleted file mode 100644
index 5499dba..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.co;
-
-import org.apache.flink.streaming.api.function.co.CoMapFunction;
-
-public class CoMapInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
-	private static final long serialVersionUID = 1L;
-
-	private CoMapFunction<IN1, IN2, OUT> mapper;
-
-	public CoMapInvokable(CoMapFunction<IN1, IN2, OUT> mapper) {
-		super(mapper);
-		this.mapper = mapper;
-	}
-
-	@Override
-	public void handleStream1() throws Exception {
-		callUserFunctionAndLogException1();
-	}
-
-	@Override
-	public void handleStream2() throws Exception {
-		callUserFunctionAndLogException2();
-	}
-
-	@Override
-	protected void callUserFunction1() throws Exception {
-		collector.collect(mapper.map1(reuse1.getObject()));
-
-	}
-
-	@Override
-	protected void callUserFunction2() throws Exception {
-		collector.collect(mapper.map2(reuse2.getObject()));
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoReduceInvokable.java
deleted file mode 100644
index 057dfce..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoReduceInvokable.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.co;
-
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-
-public class CoReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
-	private static final long serialVersionUID = 1L;
-
-	protected CoReduceFunction<IN1, IN2, OUT> coReducer;
-	protected IN1 currentValue1 = null;
-	protected IN2 currentValue2 = null;
-	protected IN1 nextValue1 = null;
-	protected IN2 nextValue2 = null;
-
-	public CoReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer) {
-		super(coReducer);
-		this.coReducer = coReducer;
-		currentValue1 = null;
-		currentValue2 = null;
-	}
-
-	@Override
-	public void handleStream1() throws Exception {
-		nextValue1 = reuse1.getObject();
-		callUserFunctionAndLogException1();
-	}
-
-	@Override
-	public void handleStream2() throws Exception {
-		nextValue2 = reuse2.getObject();
-		callUserFunctionAndLogException2();
-	}
-
-	@Override
-	protected void callUserFunction1() throws Exception {
-		if (currentValue1 != null) {
-			currentValue1 = coReducer.reduce1(currentValue1, nextValue1);
-		} else {
-			currentValue1 = nextValue1;
-		}
-		collector.collect(coReducer.map1(currentValue1));
-	}
-
-	@Override
-	protected void callUserFunction2() throws Exception {
-		if (currentValue2 != null) {
-			currentValue2 = coReducer.reduce2(currentValue2, nextValue2);
-		} else {
-			currentValue2 = nextValue2;
-		}
-		collector.collect(coReducer.map2(currentValue2));
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
deleted file mode 100644
index 93f597f..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.co;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.math.util.MathUtils;
-import org.apache.flink.streaming.api.function.co.CoWindowFunction;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.apache.flink.streaming.state.CircularFifoList;
-
-public class CoWindowInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
-	private static final long serialVersionUID = 1L;
-
-	protected CoWindowFunction<IN1, IN2, OUT> coWindowFunction;
-	protected long windowSize;
-	protected long slideSize;
-	protected CircularFifoList<StreamRecord<IN1>> circularList1;
-	protected CircularFifoList<StreamRecord<IN2>> circularList2;
-	protected TimestampWrapper<IN1> timeStamp1;
-	protected TimestampWrapper<IN2> timeStamp2;
-
-	protected StreamWindow window;
-
-	protected long startTime;
-	protected long nextRecordTime;
-
-	public CoWindowInvokable(CoWindowFunction<IN1, IN2, OUT> coWindowFunction, long windowSize,
-			long slideInterval, TimestampWrapper<IN1> timeStamp1, TimestampWrapper<IN2> timeStamp2) {
-		super(coWindowFunction);
-		this.coWindowFunction = coWindowFunction;
-		this.windowSize = windowSize;
-		this.slideSize = slideInterval;
-		this.circularList1 = new CircularFifoList<StreamRecord<IN1>>();
-		this.circularList2 = new CircularFifoList<StreamRecord<IN2>>();
-		this.timeStamp1 = timeStamp1;
-		this.timeStamp2 = timeStamp2;
-		this.startTime = timeStamp1.getStartTime();
-
-		this.window = new StreamWindow();
-	}
-
-	@Override
-	protected void handleStream1() throws Exception {
-		window.addToBuffer1(reuse1.getObject());
-	}
-
-	@Override
-	protected void handleStream2() throws Exception {
-		window.addToBuffer2(reuse2.getObject());
-	}
-
-	@Override
-	protected void callUserFunction() throws Exception {
-
-		List<IN1> first = new ArrayList<IN1>();
-		List<IN2> second = new ArrayList<IN2>();
-
-		for (IN1 element : window.circularList1.getElements()) {
-			first.add(serializer1.copy(element));
-		}
-		for (IN2 element : window.circularList2.getElements()) {
-			second.add(serializer2.copy(element));
-		}
-
-		if (!window.circularList1.isEmpty() || !window.circularList2.isEmpty()) {
-			coWindowFunction.coWindow(first, second, collector);
-		}
-	}
-
-	protected class StreamWindow implements Serializable {
-		private static final long serialVersionUID = 1L;
-
-		protected int granularity;
-		protected int batchPerSlide;
-		protected long numberOfBatches;
-
-		protected long minibatchCounter;
-
-		protected CircularFifoList<IN1> circularList1;
-		protected CircularFifoList<IN2> circularList2;
-
-		public StreamWindow() {
-			this.granularity = (int) MathUtils.gcd(windowSize, slideSize);
-			this.batchPerSlide = (int) (slideSize / granularity);
-			this.numberOfBatches = windowSize / granularity;
-			this.circularList1 = new CircularFifoList<IN1>();
-			this.circularList2 = new CircularFifoList<IN2>();
-			this.minibatchCounter = 0;
-		}
-
-		public void addToBuffer1(IN1 nextValue) throws Exception {
-			checkWindowEnd(timeStamp1.getTimestamp(nextValue));
-			if (minibatchCounter >= 0) {
-				circularList1.add(nextValue);
-			}
-		}
-
-		public void addToBuffer2(IN2 nextValue) throws Exception {
-			checkWindowEnd(timeStamp2.getTimestamp(nextValue));
-			if (minibatchCounter >= 0) {
-				circularList2.add(nextValue);
-			}
-		}
-
-		protected synchronized void checkWindowEnd(long timeStamp) {
-			nextRecordTime = timeStamp;
-
-			while (miniBatchEnd()) {
-				circularList1.newSlide();
-				circularList2.newSlide();
-				minibatchCounter++;
-				if (windowEnd()) {
-					callUserFunctionAndLogException();
-					circularList1.shiftWindow(batchPerSlide);
-					circularList2.shiftWindow(batchPerSlide);
-				}
-			}
-		}
-
-		protected boolean miniBatchEnd() {
-			if (nextRecordTime < startTime + granularity) {
-				return false;
-			} else {
-				startTime += granularity;
-				return true;
-			}
-		}
-
-		public boolean windowEnd() {
-			if (minibatchCounter == numberOfBatches) {
-				minibatchCounter -= batchPerSlide;
-				return true;
-			}
-			return false;
-		}
-
-		public void reduceLastBatch() {
-			if (!miniBatchEnd()) {
-				callUserFunctionAndLogException();
-			}
-		}
-
-		public Iterable<IN1> getIterable1() {
-			return circularList1.getIterable();
-		}
-
-		public Iterable<IN2> getIterable2() {
-			return circularList2.getIterable();
-		}
-
-		@Override
-		public String toString() {
-			return circularList1.toString();
-		}
-
-	}
-
-	@Override
-	public void close() {
-		if (!window.miniBatchEnd()) {
-			callUserFunctionAndLogException();
-		}
-		super.close();
-	}
-
-	@Override
-	protected void callUserFunction1() throws Exception {
-	}
-
-	@Override
-	protected void callUserFunction2() throws Exception {
-	}
-
-	public void setSlideSize(long slideSize) {
-		this.slideSize = slideSize;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecord.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecord.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecord.java
deleted file mode 100644
index 76f37be..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecord.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamrecord;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple;
-
-/**
- * Object for wrapping a tuple or other object with ID used for sending records
- * between streaming task in Apache Flink stream processing.
- */
-public class StreamRecord<T> implements Serializable {
-	private static final long serialVersionUID = 1L;
-
-	private UID uid;
-	private T streamObject;
-	public boolean isTuple;
-
-	/**
-	 * Creates an empty StreamRecord
-	 */
-	public StreamRecord() {
-		uid = new UID();
-	}
-
-	/**
-	 * @return The ID of the object
-	 */
-	public UID getId() {
-		return uid;
-	}
-
-	/**
-	 * Creates a new ID for the StreamRecord using the given channelID
-	 * 
-	 * @param channelID
-	 *            ID of the emitting task
-	 * @return The StreamRecord object
-	 */
-	public StreamRecord<T> newId(int channelID) {
-		uid = new UID(channelID);
-		return this;
-	}
-
-	/**
-	 * Sets the ID of the StreamRecord
-	 * 
-	 * @param id
-	 *            id to set
-	 */
-	public void setId(UID id) {
-		this.uid = id;
-	}
-
-	/**
-	 * Gets the wrapped object from the StreamRecord
-	 * 
-	 * @return The object wrapped
-	 */
-	public T getObject() {
-		return streamObject;
-	}
-
-	/**
-	 * Gets the field of the contained object at the given position. If a tuple
-	 * is wrapped then the getField method is invoked. If the StreamRecord
-	 * contains and object of Basic types only position 0 could be returned.
-	 * 
-	 * @param pos
-	 *            Position of the field to get.
-	 * @return Returns the object contained in the position.
-	 */
-	public Object getField(int pos) {
-		if (isTuple) {
-			return ((Tuple) streamObject).getField(pos);
-		} else {
-			if (pos == 0) {
-				return streamObject;
-			} else {
-				throw new IndexOutOfBoundsException();
-			}
-		}
-	}
-
-	/**
-	 * Extracts key for the stored object using the keySelector provided.
-	 * 
-	 * @param keySelector
-	 *            KeySelector for extracting the key
-	 * @return The extracted key
-	 */
-	public <R> R getKey(KeySelector<T, R> keySelector) {
-		try {
-			return keySelector.getKey(streamObject);
-		} catch (Exception e) {
-			throw new RuntimeException("Failed to extract key: " + e.getMessage());
-		}
-	}
-
-	/**
-	 * Sets the object stored
-	 * 
-	 * @param object
-	 *            Object to set
-	 * @return Returns the StreamRecord object
-	 */
-	public StreamRecord<T> setObject(T object) {
-		this.streamObject = object;
-		return this;
-	}
-
-	@Override
-	public String toString() {
-		return streamObject.toString();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
deleted file mode 100755
index 85faa9e..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUStreamRecord<?>WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamrecord;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord<T>> {
-
-	private static final long serialVersionUID = 1L;
-
-	private final TypeSerializer<T> typeSerializer;
-	private final boolean isTuple;
-
-	public StreamRecordSerializer(TypeInformation<T> typeInfo) {
-		this.typeSerializer = typeInfo.createSerializer();
-		this.isTuple = typeInfo.isTupleType();
-	}
-
-	public TypeSerializer<T> getObjectSerializer() {
-		return typeSerializer;
-	}
-
-	@Override
-	public boolean isImmutableType() {
-		return false;
-	}
-
-	@Override
-	public boolean isStateful() {
-		return false;
-	}
-
-	@Override
-	public StreamRecord<T> createInstance() {
-		try {
-			StreamRecord<T> t = new StreamRecord<T>();
-			t.isTuple = isTuple;
-			t.setObject(typeSerializer.createInstance());
-			return t;
-		} catch (Exception e) {
-			throw new RuntimeException("Cannot instantiate StreamRecord.", e);
-		}
-	}
-	
-	@Override
-	public StreamRecord<T> copy(StreamRecord<T> from) {
-		StreamRecord<T> rec = new StreamRecord<T>();
-		rec.isTuple = from.isTuple;
-		rec.setId(from.getId().copy());
-		rec.setObject(typeSerializer.copy(from.getObject()));
-		return rec;
-	}
-
-	@Override
-	public StreamRecord<T> copy(StreamRecord<T> from, StreamRecord<T> reuse) {
-		reuse.isTuple = from.isTuple;
-		reuse.setId(from.getId().copy());
-		reuse.setObject(typeSerializer.copy(from.getObject(), reuse.getObject()));
-		return reuse;
-	}
-
-	@Override
-	public int getLength() {
-		return -1;
-	}
-
-	@Override
-	public void serialize(StreamRecord<T> value, DataOutputView target) throws IOException {
-		value.getId().write(target);
-		typeSerializer.serialize(value.getObject(), target);
-	}
-	
-	@Override
-	public StreamRecord<T> deserialize(DataInputView source) throws IOException {
-		StreamRecord<T> record = new StreamRecord<T>();
-		record.isTuple = this.isTuple;
-		record.getId().read(source);
-		record.setObject(typeSerializer.deserialize(source));
-		return record;
-	}
-
-	@Override
-	public StreamRecord<T> deserialize(StreamRecord<T> reuse, DataInputView source) throws IOException {
-		reuse.getId().read(source);
-		reuse.setObject(typeSerializer.deserialize(reuse.getObject(), source));
-		return reuse;
-	}
-
-	@Override
-	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		// Needs to be implemented
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/UID.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/UID.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/UID.java
deleted file mode 100644
index b79b7e5..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/UID.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamrecord;
-
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Random;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-/**
- * Object for creating unique IDs for {@link StreamRecord}s.
- * 
- **/
-public class UID implements IOReadableWritable, Serializable {
-	private static final long serialVersionUID = 1L;
-
-	private ByteBuffer uid;
-	private static Random random = new Random();
-
-	public UID() {
-		uid = ByteBuffer.allocate(20);
-	}
-
-	// TODO: consider sequential ids
-	public UID(int channelID) {
-		byte[] uuid = new byte[16];
-		random.nextBytes(uuid);
-		uid = ByteBuffer.allocate(20).putInt(channelID).put(uuid);
-	}
-
-	UID(byte[] id) {
-		uid = ByteBuffer.wrap(id);
-	}
-
-	public int getChannelId() {
-		uid.position(0);
-		return uid.getInt();
-	}
-
-	public byte[] getGeneratedId() {
-		uid.position(4);
-		return uid.slice().array();
-	}
-
-	public byte[] getId() {
-		uid.position(0);
-		return uid.array();
-	}
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.write(uid.array());
-	}
-
-	private void writeObject(ObjectOutputStream stream) throws IOException {
-		stream.write(uid.array());
-	}
-
-	private void readObject(java.io.ObjectInputStream stream) throws IOException,
-			ClassNotFoundException {
-		byte[] uidA = new byte[20];
-		stream.read(uidA);
-		uid = ByteBuffer.allocate(20).put(uidA);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		byte[] uidByteArray = new byte[20];
-		in.readFully(uidByteArray, 0, 20);
-		uid = ByteBuffer.wrap(uidByteArray);
-	}
-
-	@Override
-	public String toString() {
-		return getChannelId() + "-" + Long.toHexString(uid.getLong(4)) + "-"
-				+ Long.toHexString(uid.getLong(12));
-	}
-
-	@Override
-	public int hashCode() {
-		return Arrays.hashCode(getId());
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj == null) {
-			return false;
-		} else {
-			try {
-				UID other = (UID) obj;
-				return Arrays.equals(this.getId(), other.getId());
-			} catch (ClassCastException e) {
-				return false;
-			}
-		}
-	}
-
-	public UID copy() {
-		return new UID(Arrays.copyOf(uid.array(), 20));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
deleted file mode 100644
index 2b650be..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamvertex;
-
-import org.apache.flink.runtime.io.network.api.reader.BufferReader;
-import org.apache.flink.runtime.io.network.api.reader.BufferReaderBase;
-import org.apache.flink.runtime.io.network.api.reader.UnionBufferReader;
-import org.apache.flink.runtime.plugable.DeserializationDelegate;
-import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.io.CoReaderIterator;
-import org.apache.flink.streaming.io.CoRecordReader;
-import org.apache.flink.util.MutableObjectIterator;
-
-import java.util.ArrayList;
-
-public class CoStreamVertex<IN1, IN2, OUT> extends StreamVertex<IN1, OUT> {
-
-	protected StreamRecordSerializer<IN1> inputDeserializer1 = null;
-	protected StreamRecordSerializer<IN2> inputDeserializer2 = null;
-
-	MutableObjectIterator<StreamRecord<IN1>> inputIter1;
-	MutableObjectIterator<StreamRecord<IN2>> inputIter2;
-
-	CoRecordReader<DeserializationDelegate<StreamRecord<IN1>>, DeserializationDelegate<StreamRecord<IN2>>> coReader;
-	CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> coIter;
-
-	private CoInvokable<IN1, IN2, OUT> userInvokable;
-	private static int numTasks;
-
-	public CoStreamVertex() {
-		userInvokable = null;
-		numTasks = newVertex();
-		instanceID = numTasks;
-	}
-
-	private void setDeserializers() {
-		inputDeserializer1 = configuration.getTypeSerializerIn1(userClassLoader);
-		inputDeserializer2 = configuration.getTypeSerializerIn2(userClassLoader);
-	}
-
-	@Override
-	public void setInputsOutputs() {
-		outputHandler = new OutputHandler<OUT>(this);
-
-		setConfigInputs();
-
-		coIter = new CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>>(coReader,
-				inputDeserializer1, inputDeserializer2);
-	}
-
-	@Override
-	protected void setInvokable() {
-		userInvokable = configuration.getUserInvokable(userClassLoader);
-		userInvokable.setup(this);
-	}
-
-	protected void setConfigInputs() throws StreamVertexException {
-		setDeserializers();
-
-		int numberOfInputs = configuration.getNumberOfInputs();
-
-		ArrayList<BufferReader> inputList1 = new ArrayList<BufferReader>();
-		ArrayList<BufferReader> inputList2 = new ArrayList<BufferReader>();
-
-		for (int i = 0; i < numberOfInputs; i++) {
-			int inputType = configuration.getInputIndex(i);
-			BufferReader reader = getEnvironment().getReader(i);
-			switch (inputType) {
-				case 1:
-					inputList1.add(reader);
-					break;
-				case 2:
-					inputList2.add(reader);
-					break;
-				default:
-					throw new RuntimeException("Invalid input type number: " + inputType);
-			}
-		}
-
-		final BufferReaderBase reader1 = inputList1.size() == 1
-				? inputList1.get(0)
-				: new UnionBufferReader(inputList1.toArray(new BufferReader[inputList1.size()]));
-
-		final BufferReaderBase reader2 = inputList2.size() == 1
-				? inputList2.get(0)
-				: new UnionBufferReader(inputList2.toArray(new BufferReader[inputList2.size()]));
-
-		coReader = new CoRecordReader<DeserializationDelegate<StreamRecord<IN1>>, DeserializationDelegate<StreamRecord<IN2>>>(reader1, reader2);
-	}
-
-	@Override
-	public void invoke() throws Exception {
-		outputHandler.invokeUserFunction("CO-TASK", userInvokable);
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public <X> MutableObjectIterator<X> getInput(int index) {
-		switch (index) {
-			case 0:
-				return (MutableObjectIterator<X>) inputIter1;
-			case 1:
-				return (MutableObjectIterator<X>) inputIter2;
-			default:
-				throw new IllegalArgumentException("CoStreamVertex has only 2 inputs");
-		}
-	}
-	@SuppressWarnings("unchecked")
-	@Override
-	public <X> StreamRecordSerializer<X> getInputSerializer(int index) {
-		switch (index) {
-			case 0:
-				return (StreamRecordSerializer<X>) inputDeserializer1;
-			case 1:
-				return (StreamRecordSerializer<X>) inputDeserializer2;
-			default:
-				throw new IllegalArgumentException("CoStreamVertex has only 2 inputs");
-		}
-	}
-	@SuppressWarnings("unchecked")
-	@Override
-	public <X, Y> CoReaderIterator<X, Y> getCoReader() {
-		return (CoReaderIterator<X, Y>) coIter;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
deleted file mode 100644
index ad6d948..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamvertex;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.api.reader.MutableReader;
-import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
-import org.apache.flink.runtime.io.network.api.reader.UnionBufferReader;
-import org.apache.flink.runtime.operators.util.ReaderIterator;
-import org.apache.flink.runtime.plugable.DeserializationDelegate;
-import org.apache.flink.streaming.api.StreamConfig;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.util.MutableObjectIterator;
-
-public class InputHandler<IN> {
-	private StreamRecordSerializer<IN> inputSerializer = null;
-	private MutableObjectIterator<StreamRecord<IN>> inputIter;
-	private MutableReader<IOReadableWritable> inputs;
-
-	private StreamVertex<IN, ?> streamVertex;
-	private StreamConfig configuration;
-
-	public InputHandler(StreamVertex<IN, ?> streamComponent) {
-		this.streamVertex = streamComponent;
-		this.configuration = new StreamConfig(streamComponent.getTaskConfiguration());
-		try {
-			setConfigInputs();
-		} catch (Exception e) {
-			throw new StreamVertexException("Cannot register inputs for "
-					+ getClass().getSimpleName(), e);
-		}
-
-	}
-
-	protected void setConfigInputs() throws StreamVertexException {
-		inputSerializer = configuration.getTypeSerializerIn1(streamVertex.userClassLoader);
-
-		int numberOfInputs = configuration.getNumberOfInputs();
-		if (numberOfInputs > 0) {
-
-			if (numberOfInputs < 2) {
-
-				inputs = new MutableRecordReader<IOReadableWritable>(streamVertex.getEnvironment().getReader(0));
-
-			} else {
-				UnionBufferReader reader = new UnionBufferReader(streamVertex.getEnvironment().getAllReaders());
-				inputs = new MutableRecordReader<IOReadableWritable>(reader);
-			}
-
-			inputIter = createInputIterator();
-		}
-	}
-
-	private MutableObjectIterator<StreamRecord<IN>> createInputIterator() {
-		@SuppressWarnings({ "unchecked", "rawtypes" })
-		final MutableObjectIterator<StreamRecord<IN>> iter = new ReaderIterator(inputs,
-				inputSerializer);
-		return iter;
-	}
-
-	protected static <T> MutableObjectIterator<StreamRecord<T>> staticCreateInputIterator(
-			MutableReader<?> inputReader, TypeSerializer<?> serializer) {
-
-		// generic data type serialization
-		@SuppressWarnings("unchecked")
-		MutableReader<DeserializationDelegate<?>> reader = (MutableReader<DeserializationDelegate<?>>) inputReader;
-		@SuppressWarnings({ "unchecked", "rawtypes" })
-		final MutableObjectIterator<StreamRecord<T>> iter = new ReaderIterator(reader, serializer);
-		return iter;
-	}
-
-	public StreamRecordSerializer<IN> getInputSerializer() {
-		return inputSerializer;
-	}
-
-	public MutableObjectIterator<StreamRecord<IN>> getInputIter() {
-		return inputIter;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
deleted file mode 100644
index 1a12cb2..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamvertex;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.StreamConfig;
-import org.apache.flink.streaming.api.collector.CollectorWrapper;
-import org.apache.flink.streaming.api.collector.DirectedCollectorWrapper;
-import org.apache.flink.streaming.api.collector.StreamOutput;
-import org.apache.flink.streaming.api.invokable.ChainableInvokable;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.io.StreamRecordWriter;
-import org.apache.flink.streaming.partitioner.StreamPartitioner;
-import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class OutputHandler<OUT> {
-	private static final Logger LOG = LoggerFactory.getLogger(OutputHandler.class);
-
-	private StreamVertex<?, OUT> vertex;
-	private StreamConfig configuration;
-	private ClassLoader cl;
-	private Collector<OUT> outerCollector;
-
-	public List<ChainableInvokable<?, ?>> chainedInvokables;
-
-	private Map<String, StreamOutput<?>> outputMap;
-	private Map<String, StreamConfig> chainedConfigs;
-	private List<Tuple2<String, String>> outEdgesInOrder;
-
-	public OutputHandler(StreamVertex<?, OUT> vertex) {
-
-		// Initialize some fields
-		this.vertex = vertex;
-		this.configuration = new StreamConfig(vertex.getTaskConfiguration());
-		this.chainedInvokables = new ArrayList<ChainableInvokable<?, ?>>();
-		this.outputMap = new HashMap<String, StreamOutput<?>>();
-		this.cl = vertex.getUserCodeClassLoader();
-
-		// We read the chained configs, and the order of record writer
-		// registrations by outputname
-		this.chainedConfigs = configuration.getTransitiveChainedTaskConfigs(cl);
-		this.chainedConfigs.put(configuration.getTaskName(), configuration);
-
-		this.outEdgesInOrder = configuration.getOutEdgesInOrder(cl);
-
-		// We iterate through all the out edges from this job vertex and create
-		// a stream output
-		for (Tuple2<String, String> outEdge : outEdgesInOrder) {
-			StreamOutput<?> streamOutput = createStreamOutput(outEdge.f1,
-					chainedConfigs.get(outEdge.f0), outEdgesInOrder.indexOf(outEdge));
-			outputMap.put(outEdge.f1, streamOutput);
-		}
-
-		// We create the outer collector that will be passed to the first task
-		// in the chain
-		this.outerCollector = createChainedCollector(configuration);
-
-	}
-
-	public Collection<StreamOutput<?>> getOutputs() {
-		return outputMap.values();
-	}
-
-	/**
-	 * This method builds up a nested collector which encapsulates all the
-	 * chained operators and their network output. The result of this recursive
-	 * call will be passed as collector to the first invokable in the chain.
-	 * 
-	 * @param chainedTaskConfig
-	 *            The configuration of the starting operator of the chain, we
-	 *            use this paramater to recursively build the whole chain
-	 * @return Returns the collector for the chain starting from the given
-	 *         config
-	 */
-	@SuppressWarnings({ "unchecked", "rawtypes" })
-	private Collector<OUT> createChainedCollector(StreamConfig chainedTaskConfig) {
-
-		boolean isDirectEmit = chainedTaskConfig.isDirectedEmit();
-
-		// We create a wrapper that will encapsulate the chained operators and
-		// network outputs
-		CollectorWrapper<OUT> wrapper = isDirectEmit ? new DirectedCollectorWrapper(
-				chainedTaskConfig.getOutputSelectors(cl)) : new CollectorWrapper<OUT>();
-
-		// Create collectors for the network outputs
-		for (String output : chainedTaskConfig.getOutputs(cl)) {
-
-			Collector<?> outCollector = outputMap.get(output);
-
-			if (isDirectEmit) {
-				((DirectedCollectorWrapper<OUT>) wrapper).addCollector(outCollector,
-						chainedTaskConfig.getSelectedNames(output));
-			} else {
-				wrapper.addCollector(outCollector);
-			}
-		}
-
-		// Create collectors for the chained outputs
-		for (String output : chainedTaskConfig.getChainedOutputs(cl)) {
-			Collector<?> outCollector = createChainedCollector(chainedConfigs.get(output));
-			if (isDirectEmit) {
-				((DirectedCollectorWrapper<OUT>) wrapper).addCollector(outCollector,
-						chainedTaskConfig.getSelectedNames(output));
-			} else {
-				wrapper.addCollector(outCollector);
-			}
-		}
-
-		if (chainedTaskConfig.isChainStart()) {
-			// The current task is the first chained task at this vertex so we
-			// return the wrapper
-			return wrapper;
-		} else {
-			// The current task is a part of the chain so we get the chainable
-			// invokable which will be returned and set it up using the wrapper
-			ChainableInvokable chainableInvokable = chainedTaskConfig.getUserInvokable(vertex
-					.getUserCodeClassLoader());
-			chainableInvokable.setup(wrapper,
-					chainedTaskConfig.getTypeSerializerIn1(vertex.getUserCodeClassLoader()));
-
-			chainedInvokables.add(chainableInvokable);
-			return chainableInvokable;
-		}
-
-	}
-
-	public Collector<OUT> getCollector() {
-		return outerCollector;
-	}
-
-	/**
-	 * We create the StreamOutput for the specific output given by the name, and
-	 * the configuration of its source task
-	 * 
-	 * @param outputVertex
-	 *            Name of the output to which the streamoutput will be set up
-	 * @param configuration
-	 *            The config of upStream task
-	 * @return
-	 */
-	private <T> StreamOutput<T> createStreamOutput(String outputVertex, StreamConfig configuration,
-			int outputIndex) {
-
-		StreamRecordSerializer<T> outSerializer = configuration
-				.getTypeSerializerOut1(vertex.userClassLoader);
-		SerializationDelegate<StreamRecord<T>> outSerializationDelegate = null;
-
-		if (outSerializer != null) {
-			outSerializationDelegate = new SerializationDelegate<StreamRecord<T>>(outSerializer);
-			outSerializationDelegate.setInstance(outSerializer.createInstance());
-		}
-
-		StreamPartitioner<T> outputPartitioner = configuration.getPartitioner(cl, outputVertex);
-
-		RecordWriter<SerializationDelegate<StreamRecord<T>>> output;
-
-		if (configuration.getBufferTimeout() >= 0) {
-
-			output = new StreamRecordWriter<SerializationDelegate<StreamRecord<T>>>(vertex
-					.getEnvironment().getWriter(outputIndex), outputPartitioner,
-					configuration.getBufferTimeout());
-
-			if (LOG.isTraceEnabled()) {
-				LOG.trace("StreamRecordWriter initiated with {} bufferTimeout for {}",
-						configuration.getBufferTimeout(), vertex.getClass().getSimpleName());
-			}
-		} else {
-			output = new RecordWriter<SerializationDelegate<StreamRecord<T>>>(vertex
-					.getEnvironment().getWriter(outputIndex), outputPartitioner);
-
-			if (LOG.isTraceEnabled()) {
-				LOG.trace("RecordWriter initiated for {}", vertex.getClass().getSimpleName());
-			}
-		}
-
-		StreamOutput<T> streamOutput = new StreamOutput<T>(output, vertex.instanceID,
-				outSerializationDelegate);
-
-		if (LOG.isTraceEnabled()) {
-			LOG.trace("Partitioner set: {} with {} outputs for {}", outputPartitioner.getClass()
-					.getSimpleName(), outputIndex, vertex.getClass().getSimpleName());
-		}
-
-		return streamOutput;
-	}
-
-	public void flushOutputs() throws IOException, InterruptedException {
-		for (StreamOutput<?> streamOutput : getOutputs()) {
-			streamOutput.close();
-		}
-	}
-
-	public void invokeUserFunction(String componentTypeName, StreamInvokable<?, OUT> userInvokable)
-			throws IOException, InterruptedException {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("{} {} invoked with instance id {}", componentTypeName, vertex.getName(),
-					vertex.getInstanceID());
-		}
-
-		try {
-			vertex.invokeUserFunction(userInvokable);
-		} catch (Exception e) {
-			flushOutputs();
-			throw new RuntimeException(e);
-		}
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("{} {} invoke finished instance id {}", componentTypeName, vertex.getName(),
-					vertex.getInstanceID());
-		}
-
-		flushOutputs();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
deleted file mode 100755
index bc89241..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamvertex;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.streaming.api.collector.StreamOutput;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.io.BlockingQueueBroker;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StreamIterationHead<OUT> extends StreamVertex<OUT, OUT> {
-
-	private static final Logger LOG = LoggerFactory.getLogger(StreamIterationHead.class);
-
-	private OutputHandler<OUT> outputHandler;
-
-	private static int numSources;
-	private Integer iterationId;
-	@SuppressWarnings("rawtypes")
-	private BlockingQueue<StreamRecord> dataChannel;
-	private long iterationWaitTime;
-	private boolean shouldWait;
-
-	@SuppressWarnings("rawtypes")
-	public StreamIterationHead() {
-		numSources = newVertex();
-		instanceID = numSources;
-		dataChannel = new ArrayBlockingQueue<StreamRecord>(1);
-	}
-
-	@Override
-	public void setInputsOutputs() {
-		outputHandler = new OutputHandler<OUT>(this);
-
-		iterationId = configuration.getIterationId();
-		iterationWaitTime = configuration.getIterationWaitTime();
-		shouldWait = iterationWaitTime > 0;
-
-		try {
-			BlockingQueueBroker.instance().handIn(iterationId.toString(), dataChannel);
-		} catch (Exception e) {
-
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public void invoke() throws Exception {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("SOURCE {} invoked with instance id {}", getName(), getInstanceID());
-		}
-
-		StreamRecord<OUT> nextRecord;
-
-		List<StreamOutput<OUT>> outputs = new LinkedList<StreamOutput<OUT>>();
-		for (StreamOutput<?> output : outputHandler.getOutputs()) {
-			outputs.add((StreamOutput<OUT>) output);
-		}
-
-		while (true) {
-			if (shouldWait) {
-				nextRecord = dataChannel.poll(iterationWaitTime, TimeUnit.MILLISECONDS);
-			} else {
-				nextRecord = dataChannel.take();
-			}
-			if (nextRecord == null) {
-				break;
-			}
-			for (StreamOutput<OUT> output : outputs) {
-				output.collect(nextRecord.getObject());
-			}
-		}
-
-		outputHandler.flushOutputs();
-	}
-
-	@Override
-	protected void setInvokable() {
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java
deleted file mode 100755
index b3ecdf8..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamvertex;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.io.BlockingQueueBroker;
-import org.apache.flink.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StreamIterationTail<IN> extends StreamVertex<IN, IN> {
-
-	private static final Logger LOG = LoggerFactory.getLogger(StreamIterationTail.class);
-
-	private InputHandler<IN> inputHandler;
-
-	private Integer iterationId;
-	@SuppressWarnings("rawtypes")
-	private BlockingQueue<StreamRecord> dataChannel;
-	private long iterationWaitTime;
-	private boolean shouldWait;
-
-	public StreamIterationTail() {
-	}
-
-	@Override
-	public void setInputsOutputs() {
-		try {
-			inputHandler = new InputHandler<IN>(this);
-
-			iterationId = configuration.getIterationId();
-			iterationWaitTime = configuration.getIterationWaitTime();
-			shouldWait = iterationWaitTime > 0;
-			dataChannel = BlockingQueueBroker.instance().get(iterationId.toString());
-		} catch (Exception e) {
-			throw new StreamVertexException(String.format(
-					"Cannot register inputs of StreamIterationSink %s", iterationId), e);
-		}
-	}
-
-	@Override
-	public void invoke() throws Exception {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("SINK {} invoked", getName());
-		}
-
-		forwardRecords();
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("SINK {} invoke finished", getName());
-		}
-	}
-
-	protected void forwardRecords() throws Exception {
-		StreamRecord<IN> reuse = inputHandler.getInputSerializer().createInstance();
-		while ((reuse = inputHandler.getInputIter().next(reuse)) != null) {
-			if (!pushToQueue(reuse)) {
-				break;
-			}
-			// TODO: Fix object reuse for iteration
-			reuse = inputHandler.getInputSerializer().createInstance();
-		}
-	}
-
-	private boolean pushToQueue(StreamRecord<IN> record) {
-		try {
-			if (shouldWait) {
-				return dataChannel.offer(record, iterationWaitTime, TimeUnit.MILLISECONDS);
-			} else {
-				dataChannel.put(record);
-				return true;
-			}
-		} catch (InterruptedException e) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Pushing back record at iteration %s failed due to: {}", iterationId,
-						StringUtils.stringifyException(e));
-			}
-			return false;
-		}
-	}
-
-	@Override
-	protected void setInvokable() {
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamTaskContext.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamTaskContext.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamTaskContext.java
deleted file mode 100644
index 7fbab3b..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamTaskContext.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamvertex;
-
-import org.apache.flink.streaming.api.StreamConfig;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.io.CoReaderIterator;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-
-public interface StreamTaskContext<OUT> {
-
-	StreamConfig getConfig();
-
-	ClassLoader getUserCodeClassLoader();
-
-	<X> MutableObjectIterator<X> getInput(int index);
-
-	<X> StreamRecordSerializer<X> getInputSerializer(int index);
-
-	Collector<OUT> getOutputCollector();
-
-	<X, Y> CoReaderIterator<X, Y> getCoReader();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
deleted file mode 100644
index 994b1fa..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamvertex;
-
-import java.util.Map;
-
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.streaming.api.StreamConfig;
-import org.apache.flink.streaming.api.invokable.ChainableInvokable;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.io.CoReaderIterator;
-import org.apache.flink.streaming.state.OperatorState;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-
-public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTaskContext<OUT> {
-
-	private static int numTasks;
-
-	protected StreamConfig configuration;
-	protected int instanceID;
-	private static int numVertices = 0;
-
-	private InputHandler<IN> inputHandler;
-	protected OutputHandler<OUT> outputHandler;
-	private StreamInvokable<IN, OUT> userInvokable;
-
-	private StreamingRuntimeContext context;
-	private Map<String, OperatorState<?>> states;
-
-	protected ClassLoader userClassLoader;
-
-	public StreamVertex() {
-		userInvokable = null;
-		numTasks = newVertex();
-		instanceID = numTasks;
-	}
-
-	protected static int newVertex() {
-		numVertices++;
-		return numVertices;
-	}
-
-	@Override
-	public void registerInputOutput() {
-		initialize();
-		setInputsOutputs();
-		setInvokable();
-	}
-
-	protected void initialize() {
-		this.userClassLoader = getUserCodeClassLoader();
-		this.configuration = new StreamConfig(getTaskConfiguration());
-		this.states = configuration.getOperatorStates(userClassLoader);
-		this.context = createRuntimeContext(getEnvironment().getTaskName(), this.states);
-	}
-
-	protected <T> void invokeUserFunction(StreamInvokable<?, T> userInvokable) throws Exception {
-		userInvokable.setRuntimeContext(context);
-		userInvokable.open(getTaskConfiguration());
-
-		for (ChainableInvokable<?, ?> invokable : outputHandler.chainedInvokables) {
-			invokable.setRuntimeContext(context);
-			invokable.open(getTaskConfiguration());
-		}
-
-		userInvokable.invoke();
-		userInvokable.close();
-
-		for (ChainableInvokable<?, ?> invokable : outputHandler.chainedInvokables) {
-			invokable.close();
-		}
-
-	}
-
-	public void setInputsOutputs() {
-		inputHandler = new InputHandler<IN>(this);
-		outputHandler = new OutputHandler<OUT>(this);
-	}
-
-	protected void setInvokable() {
-		userInvokable = configuration.getUserInvokable(userClassLoader);
-		userInvokable.setup(this);
-	}
-
-	public String getName() {
-		return getEnvironment().getTaskName();
-	}
-
-	public int getInstanceID() {
-		return instanceID;
-	}
-
-	public StreamingRuntimeContext createRuntimeContext(String taskName,
-			Map<String, OperatorState<?>> states) {
-		Environment env = getEnvironment();
-		return new StreamingRuntimeContext(taskName, env, getUserCodeClassLoader(), states);
-	}
-
-	@Override
-	public void invoke() throws Exception {
-		outputHandler.invokeUserFunction("TASK", userInvokable);
-	}
-
-	@Override
-	public StreamConfig getConfig() {
-		return configuration;
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public <X> MutableObjectIterator<X> getInput(int index) {
-		if (index == 0) {
-			return (MutableObjectIterator<X>) inputHandler.getInputIter();
-		} else {
-			throw new IllegalArgumentException("There is only 1 input");
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public <X> StreamRecordSerializer<X> getInputSerializer(int index) {
-		if (index == 0) {
-			return (StreamRecordSerializer<X>) inputHandler.getInputSerializer();
-		} else {
-			throw new IllegalArgumentException("There is only 1 input");
-		}
-	}
-
-	@Override
-	public Collector<OUT> getOutputCollector() {
-		return outputHandler.getCollector();
-	}
-
-	@Override
-	public <X, Y> CoReaderIterator<X, Y> getCoReader() {
-		throw new IllegalArgumentException("CoReader not available");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertexException.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertexException.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertexException.java
deleted file mode 100644
index ed8b91e..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertexException.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamvertex;
-
-/**
- * An exception that is thrown by the stream verices when encountering an
- * illegal condition.
- */
-public class StreamVertexException extends RuntimeException {
-
-	/**
-	 * Serial version UID for serialization interoperability.
-	 */
-	private static final long serialVersionUID = 8392043527067472439L;
-
-	/**
-	 * Creates a compiler exception with no message and no cause.
-	 */
-	public StreamVertexException() {
-	}
-
-	/**
-	 * Creates a compiler exception with the given message and no cause.
-	 * 
-	 * @param message
-	 *            The message for the exception.
-	 */
-	public StreamVertexException(String message) {
-		super(message);
-	}
-
-	/**
-	 * Creates a compiler exception with the given cause and no message.
-	 * 
-	 * @param cause
-	 *            The <tt>Throwable</tt> that caused this exception.
-	 */
-	public StreamVertexException(Throwable cause) {
-		super(cause);
-	}
-
-	/**
-	 * Creates a compiler exception with the given message and cause.
-	 * 
-	 * @param message
-	 *            The message for the exception.
-	 * @param cause
-	 *            The <tt>Throwable</tt> that caused this exception.
-	 */
-	public StreamVertexException(String message, Throwable cause) {
-		super(message, cause);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
deleted file mode 100644
index a1a64e2..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamvertex;
-
-import java.util.Map;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.streaming.state.OperatorState;
-
-/**
- * Implementation of the {@link RuntimeContext}, created by runtime stream UDF
- * operators.
- */
-public class StreamingRuntimeContext extends RuntimeUDFContext {
-
-	public Environment env;
-	private final Map<String, OperatorState<?>> operatorStates;
-
-	public StreamingRuntimeContext(String name, Environment env, ClassLoader userCodeClassLoader,
-			Map<String, OperatorState<?>> operatorStates) {
-		super(name, env.getNumberOfSubtasks(), env.getIndexInSubtaskGroup(),
-				userCodeClassLoader, env.getCopyTask());
-		this.env = env;
-		this.operatorStates = operatorStates;
-	}
-
-	/**
-	 * Returns the operator state registered by the given name for the operator.
-	 * 
-	 * @param name
-	 *            Name of the operator state to be returned.
-	 * @return The operator state.
-	 */
-	public OperatorState<?> getState(String name) {
-		if (operatorStates == null) {
-			throw new RuntimeException("No state has been registered for this operator.");
-		} else {
-			OperatorState<?> state = operatorStates.get(name);
-			if (state != null) {
-				return state;
-			} else {
-				throw new RuntimeException("No state has been registered for the name: " + name);
-			}
-		}
-
-	}
-
-	/**
-	 * Returns the input split provider associated with the operator.
-	 * 
-	 * @return The input split provider.
-	 */
-	public InputSplitProvider getInputSplitProvider() {
-		return env.getInputSplitProvider();
-	}
-
-	/**
-	 * Returns the stub parameters associated with the {@link TaskConfig} of the
-	 * operator.
-	 * 
-	 * @return The stub parameters.
-	 */
-	public Configuration getTaskStubParameters() {
-		return new TaskConfig(env.getTaskConfiguration()).getStubParameters();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistance.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistance.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistance.java
deleted file mode 100644
index 15aaf51..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistance.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.deltafunction;
-
-import org.apache.flink.streaming.api.windowing.extractor.Extractor;
-
-/**
- * This delta function calculates the cosine distance between two given vectors.
- * The cosine distance is defined as: cosineDistance=1-cosineSimilarity
- * 
- * Cosine similarity: http://en.wikipedia.org/wiki/Cosine_similarity
- * 
- * @param <DATA>
- *            The input data type. This delta function works with a double[],
- *            but can extract/convert to it from any other given object in case
- *            the respective extractor has been set. See
- *            {@link ExtractionAwareDeltaFunction} for more information.
- */
-public class CosineDistance<DATA> extends ExtractionAwareDeltaFunction<DATA, double[]> {
-
-	/**
-	 * auto-generated id
-	 */
-	private static final long serialVersionUID = -1217813582965151599L;
-
-	public CosineDistance() {
-		super(null);
-	}
-
-	public CosineDistance(Extractor<DATA, double[]> converter) {
-		super(converter);
-	}
-
-	@Override
-	public double getNestedDelta(double[] oldDataPoint, double[] newDataPoint) {
-		if (isNullvector(oldDataPoint, newDataPoint)) {
-			return 0;
-		}
-
-		double sum1 = 0;
-		double sum2 = 0;
-		for (int i = 0; i < oldDataPoint.length; i++) {
-			sum1 += oldDataPoint[i] * oldDataPoint[i];
-			sum2 += newDataPoint[i] * newDataPoint[i];
-		}
-		sum1 = Math.sqrt(sum1);
-		sum2 = Math.sqrt(sum2);
-
-		return 1d - (dotProduct(oldDataPoint, newDataPoint) / (sum1 * sum2));
-	}
-
-	private double dotProduct(double[] a, double[] b) {
-		double result = 0;
-		for (int i = 0; i < a.length; i++) {
-			result += a[i] * b[i];
-		}
-		return result;
-	}
-
-	private boolean isNullvector(double[]... vectors) {
-		outer: for (double[] v : vectors) {
-			for (double field : v) {
-				if (field != 0) {
-					continue outer;
-				}
-			}
-			// This position is only reached in case all fields are 0.
-			return true;
-		}
-		return false;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/DeltaFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/DeltaFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/DeltaFunction.java
deleted file mode 100644
index b2223d6..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/DeltaFunction.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.deltafunction;
-
-import java.io.Serializable;
-
-/**
- * This interface allows the implementation of a function which calculates the
- * delta between two data points. Delta functions might be used in delta
- * policies and allow flexible adaptive windowing based on the arriving data
- * points.
- *
- * @param <DATA>
- *            The type of input data which can be compared using this function.
- */
-public interface DeltaFunction<DATA> extends Serializable {
-
-	/**
-	 * Calculates the delta between two given data points.
-	 * 
-	 * @param oldDataPoint
-	 *            the old data point.
-	 * @param newDataPoint
-	 *            the new data point.
-	 * @return the delta between the two given points.
-	 */
-	public double getDelta(DATA oldDataPoint, DATA newDataPoint);
-
-}


Mime
View raw message