flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gyf...@apache.org
Subject [10/19] flink git commit: [streaming] Major internal renaming and restructure
Date Wed, 15 Apr 2015 09:38:51 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/UID.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/UID.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/UID.java
deleted file mode 100644
index b79b7e5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/UID.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamrecord;
-
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Random;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-/**
- * Object for creating unique IDs for {@link StreamRecord}s.
- * 
- **/
-public class UID implements IOReadableWritable, Serializable {
-	private static final long serialVersionUID = 1L;
-
-	private ByteBuffer uid;
-	private static Random random = new Random();
-
-	public UID() {
-		uid = ByteBuffer.allocate(20);
-	}
-
-	// TODO: consider sequential ids
-	public UID(int channelID) {
-		byte[] uuid = new byte[16];
-		random.nextBytes(uuid);
-		uid = ByteBuffer.allocate(20).putInt(channelID).put(uuid);
-	}
-
-	UID(byte[] id) {
-		uid = ByteBuffer.wrap(id);
-	}
-
-	public int getChannelId() {
-		uid.position(0);
-		return uid.getInt();
-	}
-
-	public byte[] getGeneratedId() {
-		uid.position(4);
-		return uid.slice().array();
-	}
-
-	public byte[] getId() {
-		uid.position(0);
-		return uid.array();
-	}
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.write(uid.array());
-	}
-
-	private void writeObject(ObjectOutputStream stream) throws IOException {
-		stream.write(uid.array());
-	}
-
-	private void readObject(java.io.ObjectInputStream stream) throws IOException,
-			ClassNotFoundException {
-		byte[] uidA = new byte[20];
-		stream.read(uidA);
-		uid = ByteBuffer.allocate(20).put(uidA);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		byte[] uidByteArray = new byte[20];
-		in.readFully(uidByteArray, 0, 20);
-		uid = ByteBuffer.wrap(uidByteArray);
-	}
-
-	@Override
-	public String toString() {
-		return getChannelId() + "-" + Long.toHexString(uid.getLong(4)) + "-"
-				+ Long.toHexString(uid.getLong(12));
-	}
-
-	@Override
-	public int hashCode() {
-		return Arrays.hashCode(getId());
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj == null) {
-			return false;
-		} else {
-			try {
-				UID other = (UID) obj;
-				return Arrays.equals(this.getId(), other.getId());
-			} catch (ClassCastException e) {
-				return false;
-			}
-		}
-	}
-
-	public UID copy() {
-		return new UID(Arrays.copyOf(uid.array(), 20));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
deleted file mode 100644
index f277be0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamvertex;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.plugable.DeserializationDelegate;
-import org.apache.flink.streaming.api.StreamEdge;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.io.CoReaderIterator;
-import org.apache.flink.streaming.io.CoRecordReader;
-import org.apache.flink.streaming.io.IndexedReaderIterator;
-import org.apache.flink.streaming.io.InputGateFactory;
-import org.apache.flink.util.MutableObjectIterator;
-
-public class CoStreamVertex<IN1, IN2, OUT> extends StreamVertex<IN1, OUT> {
-
-	protected StreamRecordSerializer<IN1> inputDeserializer1 = null;
-	protected StreamRecordSerializer<IN2> inputDeserializer2 = null;
-
-	MutableObjectIterator<StreamRecord<IN1>> inputIter1;
-	MutableObjectIterator<StreamRecord<IN2>> inputIter2;
-
-	CoRecordReader<DeserializationDelegate<StreamRecord<IN1>>, DeserializationDelegate<StreamRecord<IN2>>> coReader;
-	CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> coIter;
-
-	private static int numTasks;
-
-	public CoStreamVertex() {
-		numTasks = newVertex();
-		instanceID = numTasks;
-	}
-
-	private void setDeserializers() {
-		inputDeserializer1 = configuration.getTypeSerializerIn1(userClassLoader);
-		inputDeserializer2 = configuration.getTypeSerializerIn2(userClassLoader);
-	}
-
-	@Override
-	public void setInputsOutputs() {
-		outputHandler = new OutputHandler<OUT>(this);
-
-		setConfigInputs();
-
-		coIter = new CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>>(coReader,
-				inputDeserializer1, inputDeserializer2);
-	}
-
-	@Override
-	public void clearBuffers() throws IOException {
-		outputHandler.clearWriters();
-		coReader.clearBuffers();
-		coReader.cleanup();
-	}
-
-	protected void setConfigInputs() throws StreamVertexException {
-		setDeserializers();
-
-		int numberOfInputs = configuration.getNumberOfInputs();
-
-		ArrayList<InputGate> inputList1 = new ArrayList<InputGate>();
-		ArrayList<InputGate> inputList2 = new ArrayList<InputGate>();
-
-		List<StreamEdge> inEdges = configuration.getInPhysicalEdges(userClassLoader);
-
-		for (int i = 0; i < numberOfInputs; i++) {
-			int inputType = inEdges.get(i).getTypeNumber();
-			InputGate reader = getEnvironment().getInputGate(i);
-			switch (inputType) {
-				case 1:
-					inputList1.add(reader);
-					break;
-				case 2:
-					inputList2.add(reader);
-					break;
-				default:
-					throw new RuntimeException("Invalid input type number: " + inputType);
-			}
-		}
-
-		final InputGate reader1 = InputGateFactory.createInputGate(inputList1);
-		final InputGate reader2 = InputGateFactory.createInputGate(inputList2);
-
-		coReader = new CoRecordReader<DeserializationDelegate<StreamRecord<IN1>>, DeserializationDelegate<StreamRecord<IN2>>>(
-				reader1, reader2);
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public <X> MutableObjectIterator<X> getInput(int index) {
-		switch (index) {
-			case 0:
-				return (MutableObjectIterator<X>) inputIter1;
-			case 1:
-				return (MutableObjectIterator<X>) inputIter2;
-			default:
-				throw new IllegalArgumentException("CoStreamVertex has only 2 inputs");
-		}
-	}
-
-	@Override
-	public <X> IndexedReaderIterator<X> getIndexedInput(int index) {
-		throw new UnsupportedOperationException("Currently unsupported for connected streams");
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public <X> StreamRecordSerializer<X> getInputSerializer(int index) {
-		switch (index) {
-			case 0:
-				return (StreamRecordSerializer<X>) inputDeserializer1;
-			case 1:
-				return (StreamRecordSerializer<X>) inputDeserializer2;
-			default:
-				throw new IllegalArgumentException("CoStreamVertex has only 2 inputs");
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public <X, Y> CoReaderIterator<X, Y> getCoReader() {
-		return (CoReaderIterator<X, Y>) coIter;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
deleted file mode 100644
index c6a4377..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamvertex;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.io.network.api.reader.MutableReader;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.plugable.DeserializationDelegate;
-import org.apache.flink.streaming.api.StreamConfig;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.io.IndexedMutableReader;
-import org.apache.flink.streaming.io.IndexedReaderIterator;
-import org.apache.flink.streaming.io.InputGateFactory;
-
-public class InputHandler<IN> {
-	private StreamRecordSerializer<IN> inputSerializer = null;
-	private IndexedReaderIterator<StreamRecord<IN>> inputIter;
-	private IndexedMutableReader<DeserializationDelegate<StreamRecord<IN>>> inputs;
-
-	private StreamVertex<IN, ?> streamVertex;
-	private StreamConfig configuration;
-
-	public InputHandler(StreamVertex<IN, ?> streamComponent) {
-		this.streamVertex = streamComponent;
-		this.configuration = new StreamConfig(streamComponent.getTaskConfiguration());
-		try {
-			setConfigInputs();
-		} catch (Exception e) {
-			throw new StreamVertexException("Cannot register inputs for "
-					+ getClass().getSimpleName(), e);
-		}
-
-	}
-
-	protected void setConfigInputs() throws StreamVertexException {
-		inputSerializer = configuration.getTypeSerializerIn1(streamVertex.userClassLoader);
-
-		int numberOfInputs = configuration.getNumberOfInputs();
-
-		if (numberOfInputs > 0) {
-			InputGate inputGate = InputGateFactory.createInputGate(streamVertex.getEnvironment().getAllInputGates());
-			inputs = new IndexedMutableReader<DeserializationDelegate<StreamRecord<IN>>>(inputGate);
-
-			inputs.registerTaskEventListener(streamVertex.getSuperstepListener(),
-					StreamingSuperstep.class);
-
-			inputIter = new IndexedReaderIterator<StreamRecord<IN>>(inputs, inputSerializer);
-		}
-	}
-
-	protected static <T> IndexedReaderIterator<StreamRecord<T>> staticCreateInputIterator(
-			MutableReader<?> inputReader, TypeSerializer<StreamRecord<T>> serializer) {
-
-		// generic data type serialization
-		@SuppressWarnings("unchecked")
-		IndexedMutableReader<DeserializationDelegate<StreamRecord<T>>> reader = (IndexedMutableReader<DeserializationDelegate<StreamRecord<T>>>) inputReader;
-		final IndexedReaderIterator<StreamRecord<T>> iter = new IndexedReaderIterator<StreamRecord<T>>(
-				reader, serializer);
-		return iter;
-	}
-
-	public StreamRecordSerializer<IN> getInputSerializer() {
-		return inputSerializer;
-	}
-
-	public IndexedReaderIterator<StreamRecord<IN>> getInputIter() {
-		return inputIter;
-	}
-
-	public void clearReaders() throws IOException {
-		if (inputs != null) {
-			inputs.clearBuffers();
-			inputs.cleanup();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
deleted file mode 100644
index 40a83f3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamvertex;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.StreamConfig;
-import org.apache.flink.streaming.api.StreamEdge;
-import org.apache.flink.streaming.api.collector.CollectorWrapper;
-import org.apache.flink.streaming.api.collector.StreamOutput;
-import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
-import org.apache.flink.streaming.api.invokable.ChainableInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.io.RecordWriterFactory;
-import org.apache.flink.streaming.partitioner.StreamPartitioner;
-import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class OutputHandler<OUT> {
-	private static final Logger LOG = LoggerFactory.getLogger(OutputHandler.class);
-
-	private StreamVertex<?, OUT> vertex;
-	private StreamConfig configuration;
-	private ClassLoader cl;
-	private Collector<OUT> outerCollector;
-
-	public List<ChainableInvokable<?, ?>> chainedInvokables;
-
-	private Map<StreamEdge, StreamOutput<?>> outputMap;
-
-	private Map<Integer, StreamConfig> chainedConfigs;
-	private List<StreamEdge> outEdgesInOrder;
-
-	public OutputHandler(StreamVertex<?, OUT> vertex) {
-
-		// Initialize some fields
-		this.vertex = vertex;
-		this.configuration = new StreamConfig(vertex.getTaskConfiguration());
-		this.chainedInvokables = new ArrayList<ChainableInvokable<?, ?>>();
-		this.outputMap = new HashMap<StreamEdge, StreamOutput<?>>();
-		this.cl = vertex.getUserCodeClassLoader();
-
-		// We read the chained configs, and the order of record writer
-		// registrations by outputname
-		this.chainedConfigs = configuration.getTransitiveChainedTaskConfigs(cl);
-		this.chainedConfigs.put(configuration.getVertexID(), configuration);
-
-		this.outEdgesInOrder = configuration.getOutEdgesInOrder(cl);
-
-		// We iterate through all the out edges from this job vertex and create
-		// a stream output
-		for (StreamEdge outEdge : outEdgesInOrder) {
-			StreamOutput<?> streamOutput = createStreamOutput(
-					outEdge,
-					outEdge.getTargetID(),
-					chainedConfigs.get(outEdge.getSourceID()),
-					outEdgesInOrder.indexOf(outEdge));
-			outputMap.put(outEdge, streamOutput);
-		}
-
-		// We create the outer collector that will be passed to the first task
-		// in the chain
-		this.outerCollector = createChainedCollector(configuration);
-	}
-
-	public void broadcastBarrier(long id) throws IOException, InterruptedException {
-		StreamingSuperstep barrier = new StreamingSuperstep(id);
-		for (StreamOutput<?> streamOutput : outputMap.values()) {
-			streamOutput.broadcastEvent(barrier);
-		}
-	}
-
-	public Collection<StreamOutput<?>> getOutputs() {
-		return outputMap.values();
-	}
-
-	/**
-	 * This method builds up a nested collector which encapsulates all the
-	 * chained operators and their network output. The result of this recursive
-	 * call will be passed as collector to the first invokable in the chain.
-	 *
-	 * @param chainedTaskConfig
-	 * 		The configuration of the starting operator of the chain, we
-	 * 		use this paramater to recursively build the whole chain
-	 * @return Returns the collector for the chain starting from the given
-	 * config
-	 */
-	@SuppressWarnings({"unchecked", "rawtypes"})
-	private Collector<OUT> createChainedCollector(StreamConfig chainedTaskConfig) {
-
-
-		// We create a wrapper that will encapsulate the chained operators and
-		// network outputs
-
-		OutputSelectorWrapper<OUT> outputSelectorWrapper = chainedTaskConfig.getOutputSelectorWrapper(cl);
-		CollectorWrapper<OUT> wrapper = new CollectorWrapper<OUT>(outputSelectorWrapper);
-
-		// Create collectors for the network outputs
-		for (StreamEdge outputEdge : chainedTaskConfig.getNonChainedOutputs(cl)) {
-			Collector<?> outCollector = outputMap.get(outputEdge);
-
-			wrapper.addCollector(outCollector, outputEdge);
-		}
-
-		// Create collectors for the chained outputs
-		for (StreamEdge outputEdge : chainedTaskConfig.getChainedOutputs(cl)) {
-			Integer output = outputEdge.getTargetID();
-
-			Collector<?> outCollector = createChainedCollector(chainedConfigs.get(output));
-
-			wrapper.addCollector(outCollector, outputEdge);
-		}
-
-		if (chainedTaskConfig.isChainStart()) {
-			// The current task is the first chained task at this vertex so we
-			// return the wrapper
-			return wrapper;
-		} else {
-			// The current task is a part of the chain so we get the chainable
-			// invokable which will be returned and set it up using the wrapper
-			ChainableInvokable chainableInvokable = chainedTaskConfig.getUserInvokable(vertex
-					.getUserCodeClassLoader());
-			chainableInvokable.setup(wrapper,
-					chainedTaskConfig.getTypeSerializerIn1(vertex.getUserCodeClassLoader()));
-
-			chainedInvokables.add(chainableInvokable);
-			return chainableInvokable;
-		}
-
-	}
-
-	public Collector<OUT> getCollector() {
-		return outerCollector;
-	}
-
-	/**
-	 * We create the StreamOutput for the specific output given by the id, and
-	 * the configuration of its source task
-	 *
-	 * @param outputVertex
-	 * 		Name of the output to which the streamoutput will be set up
-	 * @param upStreamConfig
-	 * 		The config of upStream task
-	 * @return The created StreamOutput
-	 */
-	private <T> StreamOutput<T> createStreamOutput(StreamEdge edge, Integer outputVertex,
-			StreamConfig upStreamConfig, int outputIndex) {
-
-		StreamRecordSerializer<T> outSerializer = upStreamConfig
-				.getTypeSerializerOut1(vertex.userClassLoader);
-		SerializationDelegate<StreamRecord<T>> outSerializationDelegate = null;
-
-		if (outSerializer != null) {
-			outSerializationDelegate = new SerializationDelegate<StreamRecord<T>>(outSerializer);
-			outSerializationDelegate.setInstance(outSerializer.createInstance());
-		}
-
-		@SuppressWarnings("unchecked")
-		StreamPartitioner<T> outputPartitioner = (StreamPartitioner<T>) edge.getPartitioner();
-
-		ResultPartitionWriter bufferWriter = vertex.getEnvironment().getWriter(outputIndex);
-
-		RecordWriter<SerializationDelegate<StreamRecord<T>>> output =
-				RecordWriterFactory.createRecordWriter(bufferWriter, outputPartitioner, upStreamConfig.getBufferTimeout());
-
-		StreamOutput<T> streamOutput = new StreamOutput<T>(output, vertex.instanceID,
-				outSerializationDelegate);
-
-		if (LOG.isTraceEnabled()) {
-			LOG.trace("Partitioner set: {} with {} outputs for {}", outputPartitioner.getClass()
-					.getSimpleName(), outputIndex, vertex.getClass().getSimpleName());
-		}
-
-		return streamOutput;
-	}
-
-	public void flushOutputs() throws IOException, InterruptedException {
-		for (StreamOutput<?> streamOutput : getOutputs()) {
-			streamOutput.close();
-		}
-	}
-
-	public void clearWriters() {
-		for (StreamOutput<?> output : outputMap.values()) {
-			output.clearBuffers();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
deleted file mode 100644
index 816c0d6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamvertex;
-
-import java.util.Collection;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.streaming.api.collector.StreamOutput;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.io.BlockingQueueBroker;
-import org.apache.flink.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StreamIterationHead<OUT> extends StreamVertex<OUT, OUT> {
-
-	private static final Logger LOG = LoggerFactory.getLogger(StreamIterationHead.class);
-
-	private Collection<StreamOutput<?>> outputs;
-
-	private static int numSources;
-	private Integer iterationId;
-	@SuppressWarnings("rawtypes")
-	private BlockingQueue<StreamRecord> dataChannel;
-	private long iterationWaitTime;
-	private boolean shouldWait;
-
-	@SuppressWarnings("rawtypes")
-	public StreamIterationHead() {
-		numSources = newVertex();
-		instanceID = numSources;
-		dataChannel = new ArrayBlockingQueue<StreamRecord>(1);
-	}
-
-	@Override
-	public void setInputsOutputs() {
-		outputHandler = new OutputHandler<OUT>(this);
-		outputs = outputHandler.getOutputs();
-
-		iterationId = configuration.getIterationId();
-		iterationWaitTime = configuration.getIterationWaitTime();
-		shouldWait = iterationWaitTime > 0;
-
-		try {
-			BlockingQueueBroker.instance().handIn(iterationId.toString()+"-" 
-					+getEnvironment().getIndexInSubtaskGroup(), dataChannel);
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
-
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public void invoke() throws Exception {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Iteration source {} invoked with instance id {}", getName(), getInstanceID());
-		}
-
-		try {
-			StreamRecord<OUT> nextRecord;
-
-			while (true) {
-				if (shouldWait) {
-					nextRecord = dataChannel.poll(iterationWaitTime, TimeUnit.MILLISECONDS);
-				} else {
-					nextRecord = dataChannel.take();
-				}
-				if (nextRecord == null) {
-					break;
-				}
-				for (StreamOutput<?> output : outputs) {
-					((StreamOutput<OUT>) output).collect(nextRecord.getObject());
-				}
-			}
-
-		} catch (Exception e) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Iteration source failed due to: {}", StringUtils.stringifyException(e));
-			}
-			throw e;
-		} finally {
-			// Cleanup
-			outputHandler.flushOutputs();
-			clearBuffers();
-		}
-
-	}
-
-	@Override
-	protected void setInvokable() {
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java
deleted file mode 100644
index ab09aff..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamvertex;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.io.BlockingQueueBroker;
-import org.apache.flink.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StreamIterationTail<IN> extends StreamVertex<IN, IN> {
-
-	private static final Logger LOG = LoggerFactory.getLogger(StreamIterationTail.class);
-
-	private InputHandler<IN> inputHandler;
-
-	private Integer iterationId;
-	@SuppressWarnings("rawtypes")
-	private BlockingQueue<StreamRecord> dataChannel;
-	private long iterationWaitTime;
-	private boolean shouldWait;
-
-	public StreamIterationTail() {
-	}
-
-	@Override
-	public void setInputsOutputs() {
-		try {
-			inputHandler = new InputHandler<IN>(this);
-
-			iterationId = configuration.getIterationId();
-			iterationWaitTime = configuration.getIterationWaitTime();
-			shouldWait = iterationWaitTime > 0;
-			dataChannel = BlockingQueueBroker.instance().get(iterationId.toString()+"-"
-					+getEnvironment().getIndexInSubtaskGroup());
-		} catch (Exception e) {
-			throw new StreamVertexException(String.format(
-					"Cannot register inputs of StreamIterationSink %s", iterationId), e);
-		}
-	}
-
-	@Override
-	public void invoke() throws Exception {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Iteration sink {} invoked", getName());
-		}
-
-		try {
-			forwardRecords();
-
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Iteration sink {} invoke finished", getName());
-			}
-		} catch (Exception e) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Iteration sink failed due to: {}", StringUtils.stringifyException(e));
-			}
-			throw e;
-		} finally {
-			// Cleanup
-			clearBuffers();
-		}
-	}
-
-	protected void forwardRecords() throws Exception {
-		StreamRecord<IN> reuse = inputHandler.getInputSerializer().createInstance();
-		while ((reuse = inputHandler.getInputIter().next(reuse)) != null) {
-			if (!pushToQueue(reuse)) {
-				break;
-			}
-			reuse = inputHandler.getInputSerializer().createInstance();
-		}
-	}
-
-	private boolean pushToQueue(StreamRecord<IN> record) throws InterruptedException {
-		try {
-			if (shouldWait) {
-				return dataChannel.offer(record, iterationWaitTime, TimeUnit.MILLISECONDS);
-			} else {
-				dataChannel.put(record);
-				return true;
-			}
-		} catch (InterruptedException e) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Pushing back record at iteration %s failed due to: {}", iterationId,
-						StringUtils.stringifyException(e));
-				throw e;
-			}
-			return false;
-		}
-	}
-
-	@Override
-	protected void setInvokable() {
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamTaskContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamTaskContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamTaskContext.java
deleted file mode 100644
index 1c904ca..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamTaskContext.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamvertex;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.streaming.api.StreamConfig;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.io.CoReaderIterator;
-import org.apache.flink.streaming.io.IndexedReaderIterator;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-
-public interface StreamTaskContext<OUT> {
-
-	StreamConfig getConfig();
-
-	ClassLoader getUserCodeClassLoader();
-
-	<X> MutableObjectIterator<X> getInput(int index);
-
-	<X> IndexedReaderIterator<X> getIndexedInput(int index);
-
-	<X> StreamRecordSerializer<X> getInputSerializer(int index);
-
-	Collector<OUT> getOutputCollector();
-
-	<X, Y> CoReaderIterator<X, Y> getCoReader();
-
-	ExecutionConfig getExecutionConfig();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
deleted file mode 100644
index b56eda3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ /dev/null
@@ -1,326 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamvertex;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.runtime.event.task.TaskEvent;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.BarrierTransceiver;
-import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
-import org.apache.flink.runtime.messages.CheckpointingMessages;
-import org.apache.flink.runtime.state.LocalStateHandle;
-import org.apache.flink.runtime.state.OperatorState;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.streaming.api.StreamConfig;
-import org.apache.flink.streaming.api.invokable.ChainableInvokable;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.io.CoReaderIterator;
-import org.apache.flink.streaming.io.IndexedReaderIterator;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-import org.apache.flink.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import akka.actor.ActorRef;
-
-public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTaskContext<OUT>,
-		BarrierTransceiver, OperatorStateCarrier {
-
-	private static final Logger LOG = LoggerFactory.getLogger(StreamVertex.class);
-
-	private static int numTasks;
-
-	protected StreamConfig configuration;
-	protected int instanceID;
-	private static int numVertices = 0;
-
-	private InputHandler<IN> inputHandler;
-	protected OutputHandler<OUT> outputHandler;
-	private StreamInvokable<IN, OUT> userInvokable;
-	protected volatile boolean isRunning = false;
-
-	private StreamingRuntimeContext context;
-	private Map<String, OperatorState<?>> states;
-
-	protected ClassLoader userClassLoader;
-
-	private EventListener<TaskEvent> superstepListener;
-
-	public StreamVertex() {
-		userInvokable = null;
-		numTasks = newVertex();
-		instanceID = numTasks;
-		superstepListener = new SuperstepEventListener();
-	}
-
-	protected static int newVertex() {
-		numVertices++;
-		return numVertices;
-	}
-
-	@Override
-	public void registerInputOutput() {
-		initialize();
-		setInputsOutputs();
-		setInvokable();
-	}
-
-	protected void initialize() {
-		this.userClassLoader = getUserCodeClassLoader();
-		this.configuration = new StreamConfig(getTaskConfiguration());
-		this.states = new HashMap<String, OperatorState<?>>();
-		this.context = createRuntimeContext(getEnvironment().getTaskName(), this.states);
-	}
-
-	@Override
-	public void broadcastBarrierFromSource(long id) {
-		// Only called at input vertices
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Received barrier from jobmanager: " + id);
-		}
-		actOnBarrier(id);
-	}
-
-	/**
-	 * This method is called to confirm that a barrier has been fully processed.
-	 * It sends an acknowledgment to the jobmanager. In the current version if
-	 * there is user state it also checkpoints the state to the jobmanager.
-	 */
-	@Override
-	public void confirmBarrier(long barrierID) throws IOException {
-
-		if (configuration.getStateMonitoring() && !states.isEmpty()) {
-			getEnvironment().getJobManager().tell(
-					new CheckpointingMessages.StateBarrierAck(getEnvironment().getJobID(), getEnvironment()
-							.getJobVertexId(), context.getIndexOfThisSubtask(), barrierID,
-							new LocalStateHandle(states)), ActorRef.noSender());
-		} else {
-			getEnvironment().getJobManager().tell(
-					new CheckpointingMessages.BarrierAck(getEnvironment().getJobID(), getEnvironment().getJobVertexId(),
-							context.getIndexOfThisSubtask(), barrierID), ActorRef.noSender());
-		}
-
-	}
-
-	public void setInputsOutputs() {
-		inputHandler = new InputHandler<IN>(this);
-		outputHandler = new OutputHandler<OUT>(this);
-	}
-
-	protected void setInvokable() {
-		userInvokable = configuration.getUserInvokable(userClassLoader);
-		userInvokable.setup(this);
-	}
-
-	public String getName() {
-		return getEnvironment().getTaskName();
-	}
-
-	public int getInstanceID() {
-		return instanceID;
-	}
-
-	public StreamingRuntimeContext createRuntimeContext(String taskName,
-			Map<String, OperatorState<?>> states) {
-		Environment env = getEnvironment();
-		return new StreamingRuntimeContext(taskName, env, getUserCodeClassLoader(),
-				getExecutionConfig(), states);
-	}
-
-	@Override
-	public void invoke() throws Exception {
-		this.isRunning = true;
-
-		boolean operatorOpen = false;
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Task {} invoked with instance id {}", getName(), getInstanceID());
-		}
-
-		try {
-			userInvokable.setRuntimeContext(context);
-
-			operatorOpen = true;
-			openOperator();
-
-			userInvokable.invoke();
-
-			closeOperator();
-			operatorOpen = false;
-
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Task {} invoke finished instance id {}", getName(), getInstanceID());
-			}
-
-		} catch (Exception e) {
-
-			if (operatorOpen) {
-				try {
-					closeOperator();
-				} catch (Throwable t) {
-				}
-			}
-
-			if (LOG.isErrorEnabled()) {
-				LOG.error("StreamInvokable failed due to: {}", StringUtils.stringifyException(e));
-			}
-			throw e;
-		} finally {
-			this.isRunning = false;
-			// Cleanup
-			outputHandler.flushOutputs();
-			clearBuffers();
-		}
-
-	}
-
-	protected void openOperator() throws Exception {
-		userInvokable.open(getTaskConfiguration());
-
-		for (ChainableInvokable<?, ?> invokable : outputHandler.chainedInvokables) {
-			invokable.setRuntimeContext(context);
-			invokable.open(getTaskConfiguration());
-		}
-	}
-
-	protected void closeOperator() throws Exception {
-		userInvokable.close();
-
-		for (ChainableInvokable<?, ?> invokable : outputHandler.chainedInvokables) {
-			invokable.close();
-		}
-	}
-
-	protected void clearBuffers() throws IOException {
-		if (outputHandler != null) {
-			outputHandler.clearWriters();
-		}
-		if (inputHandler != null) {
-			inputHandler.clearReaders();
-		}
-	}
-
-	@Override
-	public void cancel() {
-		if (userInvokable != null) {
-			userInvokable.cancel();
-		}
-	}
-
-	@Override
-	public StreamConfig getConfig() {
-		return configuration;
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public <X> MutableObjectIterator<X> getInput(int index) {
-		if (index == 0) {
-			return (MutableObjectIterator<X>) inputHandler.getInputIter();
-		} else {
-			throw new IllegalArgumentException("There is only 1 input");
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public <X> IndexedReaderIterator<X> getIndexedInput(int index) {
-		if (index == 0) {
-			return (IndexedReaderIterator<X>) inputHandler.getInputIter();
-		} else {
-			throw new IllegalArgumentException("There is only 1 input");
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public <X> StreamRecordSerializer<X> getInputSerializer(int index) {
-		if (index == 0) {
-			return (StreamRecordSerializer<X>) inputHandler.getInputSerializer();
-		} else {
-			throw new IllegalArgumentException("There is only 1 input");
-		}
-	}
-
-	@Override
-	public Collector<OUT> getOutputCollector() {
-		return outputHandler.getCollector();
-	}
-
-	@Override
-	public <X, Y> CoReaderIterator<X, Y> getCoReader() {
-		throw new IllegalArgumentException("CoReader not available");
-	}
-
-	public EventListener<TaskEvent> getSuperstepListener() {
-		return this.superstepListener;
-	}
-
-	/**
-	 * Method to be called when a barrier is received from all the input
-	 * channels. It should broadcast the barrier to the output operators,
-	 * checkpoint the state and send an ack.
-	 * 
-	 * @param id
-	 */
-	private synchronized void actOnBarrier(long id) {
-		if (isRunning) {
-			try {
-				outputHandler.broadcastBarrier(id);
-				confirmBarrier(id);
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("Superstep " + id + " processed: " + StreamVertex.this);
-				}
-			} catch (Exception e) {
-				// Only throw any exception if the vertex is still running
-				if (isRunning) {
-					throw new RuntimeException(e);
-				}
-			}
-		}
-	}
-
-	@Override
-	public String toString() {
-		return configuration.getOperatorName() + " (" + context.getIndexOfThisSubtask() + ")";
-	}
-
-	/**
-	 * Re-injects the user states into the map
-	 */
-	@Override
-	public void injectState(StateHandle stateHandle) {
-		this.states.putAll(stateHandle.getState(userClassLoader));
-	}
-
-	private class SuperstepEventListener implements EventListener<TaskEvent> {
-
-		@Override
-		public void onEvent(TaskEvent event) {
-			actOnBarrier(((StreamingSuperstep) event).getId());
-		}
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertexException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertexException.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertexException.java
deleted file mode 100644
index ed8b91e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertexException.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamvertex;
-
-/**
- * An exception that is thrown by the stream verices when encountering an
- * illegal condition.
- */
-public class StreamVertexException extends RuntimeException {
-
-	/**
-	 * Serial version UID for serialization interoperability.
-	 */
-	private static final long serialVersionUID = 8392043527067472439L;
-
-	/**
-	 * Creates a compiler exception with no message and no cause.
-	 */
-	public StreamVertexException() {
-	}
-
-	/**
-	 * Creates a compiler exception with the given message and no cause.
-	 * 
-	 * @param message
-	 *            The message for the exception.
-	 */
-	public StreamVertexException(String message) {
-		super(message);
-	}
-
-	/**
-	 * Creates a compiler exception with the given cause and no message.
-	 * 
-	 * @param cause
-	 *            The <tt>Throwable</tt> that caused this exception.
-	 */
-	public StreamVertexException(Throwable cause) {
-		super(cause);
-	}
-
-	/**
-	 * Creates a compiler exception with the given message and cause.
-	 * 
-	 * @param message
-	 *            The message for the exception.
-	 * @param cause
-	 *            The <tt>Throwable</tt> that caused this exception.
-	 */
-	public StreamVertexException(String message, Throwable cause) {
-		super(message, cause);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
deleted file mode 100644
index ff876b1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamvertex;
-
-import java.util.Map;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.runtime.state.OperatorState;
-
-/**
- * Implementation of the {@link RuntimeContext}, created by runtime stream UDF
- * operators.
- */
-public class StreamingRuntimeContext extends RuntimeUDFContext {
-
-	private final Environment env;
-	private final Map<String, OperatorState<?>> operatorStates;
-
-	public StreamingRuntimeContext(String name, Environment env, ClassLoader userCodeClassLoader,
-			ExecutionConfig executionConfig, Map<String, OperatorState<?>> operatorStates) {
-		super(name, env.getNumberOfSubtasks(), env.getIndexInSubtaskGroup(), userCodeClassLoader,
-				executionConfig, env.getCopyTask());
-		this.env = env;
-		this.operatorStates = operatorStates;
-	}
-
-	/**
-	 * Returns the operator state registered by the given name for the operator.
-	 * 
-	 * @param name
-	 *            Name of the operator state to be returned.
-	 * @return The operator state.
-	 */
-	public OperatorState<?> getState(String name) {
-		if (operatorStates == null) {
-			throw new RuntimeException("No state has been registered for this operator.");
-		} else {
-			OperatorState<?> state = operatorStates.get(name);
-			if (state != null) {
-				return state;
-			} else {
-				throw new RuntimeException("No state has been registered for the name: " + name);
-			}
-		}
-	}
-
-	/**
-	 * Returns whether there is a state stored by the given name
-	 */
-	public boolean containsState(String name) {
-		return operatorStates.containsKey(name);
-	}
-
-	/**
-	 * This is a beta feature </br></br> Register an operator state for this
-	 * operator by the given name. This name can be used to retrieve the state
-	 * during runtime using {@link StreamingRuntimeContext#getState(String)}. To
-	 * obtain the {@link StreamingRuntimeContext} from the user-defined function
-	 * use the {@link RichFunction#getRuntimeContext()} method.
-	 * 
-	 * @param name
-	 *            The name of the operator state.
-	 * @param state
-	 *            The state to be registered for this name.
-	 */
-	public void registerState(String name, OperatorState<?> state) {
-		if (state == null) {
-			throw new RuntimeException("Cannot register null state");
-		} else {
-			if (operatorStates.containsKey(name)) {
-				throw new RuntimeException("State is already registered");
-			} else {
-				operatorStates.put(name, state);
-			}
-		}
-	}
-
-	/**
-	 * Returns the input split provider associated with the operator.
-	 * 
-	 * @return The input split provider.
-	 */
-	public InputSplitProvider getInputSplitProvider() {
-		return env.getInputSplitProvider();
-	}
-
-	/**
-	 * Returns the stub parameters associated with the {@link TaskConfig} of the
-	 * operator.
-	 * 
-	 * @return The stub parameters.
-	 */
-	public Configuration getTaskStubParameters() {
-		return new TaskConfig(env.getTaskConfiguration()).getStubParameters();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingSuperstep.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingSuperstep.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingSuperstep.java
deleted file mode 100644
index d46ca79..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingSuperstep.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamvertex;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.event.task.TaskEvent;
-
-public class StreamingSuperstep extends TaskEvent {
-
-	protected long id;
-
-	public StreamingSuperstep() {
-
-	}
-
-	public StreamingSuperstep(long id) {
-		this.id = id;
-	}
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeLong(id);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		id = in.readLong();
-	}
-
-	public long getId() {
-		return id;
-	}
-
-	public boolean equals(Object other) {
-		if (other == null || !(other instanceof StreamingSuperstep)) {
-			return false;
-		} else {
-			return ((StreamingSuperstep) other).id == this.id;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java
index abe5298..cab2bef 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java
@@ -23,7 +23,7 @@ import org.apache.flink.streaming.api.windowing.helper.Timestamp;
  * This interface extends the {@link TriggerPolicy} interface with functionality
  * for active triggers. Active triggers can act in two ways:
  * 
- * 1) Whenever an element arrives at the invokable, the
+ * 1) Whenever an element arrives at the operator, the
  * {@link ActiveTriggerPolicy#preNotifyTrigger(Object)} method gets called
  * first. It can return zero ore more fake data points which will be added
  * before the currently arrived real element gets processed. This allows to
@@ -32,7 +32,7 @@ import org.apache.flink.streaming.api.windowing.helper.Timestamp;
  * datapoint is always considered as triggered.
  * 
  * 2) An active trigger has a factory method for a runnable. This factory method
- * gets called at the start up of the invokable. The returned runnable will be
+ * gets called at the start up of the operator. The returned runnable will be
  * executed in its own thread and can submit fake elements at any time threw an
  * {@link ActiveTriggerCallback}. This allows to have time based triggers based
  * on any system internal time measure. Triggers are not called on fake
@@ -44,7 +44,7 @@ import org.apache.flink.streaming.api.windowing.helper.Timestamp;
 public interface ActiveTriggerPolicy<DATA> extends TriggerPolicy<DATA> {
 
 	/**
-	 * Whenever an element arrives at the invokable, the
+	 * Whenever an element arrives at the operator, the
 	 * {@link ActiveTriggerPolicy#preNotifyTrigger(Object)} method gets called
 	 * first. It can return zero ore more fake data points which will be added
 	 * before the the currently arrived real element gets processed. This allows
@@ -53,7 +53,7 @@ public interface ActiveTriggerPolicy<DATA> extends TriggerPolicy<DATA> {
 	 * datapoint is always considered as triggered.
 	 * 
 	 * @param datapoint
-	 *            the data point which arrived at the invokable
+	 *            the data point which arrived at the operator
 	 * @return zero ore more fake data points which will be added before the the
 	 *         currently arrived real element gets processed.
 	 */
@@ -61,7 +61,7 @@ public interface ActiveTriggerPolicy<DATA> extends TriggerPolicy<DATA> {
 
 	/**
 	 * This is the factory method for a runnable. This factory method gets
-	 * called at the start up of the invokable. The returned runnable will be
+	 * called at the start up of the operator. The returned runnable will be
 	 * executed in its own thread and can submit fake elements at any time threw
 	 * an {@link ActiveTriggerCallback}. This allows to have time based triggers
 	 * based on any system internal time measure. Triggers are not called on

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableEvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableEvictionPolicy.java
index 1937b3f..6bc5072 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableEvictionPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableEvictionPolicy.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.windowing.policy;
 
 
 /**
- * When used in {@link GroupedWindowInvokable}, eviction policies must
+ * When used in grouped windowing, eviction policies must
  * provide a clone method. Eviction policies get cloned to provide an own
  * instance for each group and respectively each individual element buffer as
  * groups maintain their own buffers with the elements belonging to the

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableTriggerPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableTriggerPolicy.java
index 6a04461..5b5e20b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableTriggerPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableTriggerPolicy.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.windowing.policy;
 
 
 /**
- * When used in {@link GroupedWindowInvokable}, trigger policies can provide
+ * When used in grouped windowing, trigger policies can provide
  * a clone method. Cloneable triggers can can be used in a distributed manner,
  * which means they get cloned to provide an own instance for each group. This
  * allows each group to trigger individually and only based on the elements

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/EvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/EvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/EvictionPolicy.java
index c224ad4..b95053a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/EvictionPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/EvictionPolicy.java
@@ -40,7 +40,7 @@ public interface EvictionPolicy<DATA> extends Serializable {
 	 * @param triggered
 	 *            Information whether the UDF was triggered or not
 	 * @param bufferSize
-	 *            The current size of the element buffer at the invokable
+	 *            The current size of the element buffer at the operator
 	 * @return The number of elements to be deleted from the buffer
 	 */
 	public int notifyEviction(DATA datapoint, boolean triggered, int bufferSize);

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
deleted file mode 100644
index 6c198a7..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
+++ /dev/null
@@ -1,279 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.io;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.Set;
-
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.streaming.api.streamvertex.StreamingSuperstep;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Class encapsulating the functionality that is necessary to sync inputs on
- * superstep barriers. Once a barrier is received from an input channel, whe
- * should not process further buffers from that channel until we received the
- * barrier from all other channels as well. To avoid back-pressuring the
- * readers, we buffer up the new data received from the blocked channels until
- * the blocks are released.
- * 
- */
-public class BarrierBuffer {
-
-	private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class);
-
-	private Queue<SpillingBufferOrEvent> nonprocessed = new LinkedList<SpillingBufferOrEvent>();
-	private Queue<SpillingBufferOrEvent> blockedNonprocessed = new LinkedList<SpillingBufferOrEvent>();
-
-	private Set<Integer> blockedChannels = new HashSet<Integer>();
-	private int totalNumberOfInputChannels;
-
-	private StreamingSuperstep currentSuperstep;
-	private boolean superstepStarted;
-
-	private AbstractReader reader;
-
-	private InputGate inputGate;
-
-	private SpillReader spillReader;
-	private BufferSpiller bufferSpiller;
-
-	private boolean inputFinished = false;
-
-	private BufferOrEvent endOfStreamEvent = null;
-
-	public BarrierBuffer(InputGate inputGate, AbstractReader reader) {
-		this.inputGate = inputGate;
-		totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
-		this.reader = reader;
-		try {
-			this.bufferSpiller = new BufferSpiller();
-			this.spillReader = new SpillReader();
-		} catch (IOException e) {
-			throw new RuntimeException(e);
-		}
-
-	}
-
-	/**
-	 * Starts the next superstep in the buffer
-	 * 
-	 * @param superstep
-	 *            The next superstep
-	 */
-	protected void startSuperstep(StreamingSuperstep superstep) {
-		this.currentSuperstep = superstep;
-		this.superstepStarted = true;
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Superstep started with id: " + superstep.getId());
-		}
-	}
-
-	/**
-	 * Get then next non-blocked non-processed BufferOrEvent. Returns null if
-	 * not available.
-	 * 
-	 * @throws IOException
-	 */
-	protected BufferOrEvent getNonProcessed() throws IOException {
-		SpillingBufferOrEvent nextNonprocessed;
-
-		while ((nextNonprocessed = nonprocessed.poll()) != null) {
-			BufferOrEvent boe = nextNonprocessed.getBufferOrEvent();
-			if (isBlocked(boe.getChannelIndex())) {
-				blockedNonprocessed.add(new SpillingBufferOrEvent(boe, bufferSpiller, spillReader));
-			} else {
-				return boe;
-			}
-		}
-
-		return null;
-	}
-
-	/**
-	 * Checks whether a given channel index is blocked for this inputgate
-	 * 
-	 * @param channelIndex
-	 *            The channel index to check
-	 */
-	protected boolean isBlocked(int channelIndex) {
-		return blockedChannels.contains(channelIndex);
-	}
-
-	/**
-	 * Checks whether all channels are blocked meaning that barriers are
-	 * received from all channels
-	 */
-	protected boolean isAllBlocked() {
-		return blockedChannels.size() == totalNumberOfInputChannels;
-	}
-
-	/**
-	 * Returns the next non-blocked BufferOrEvent. This is a blocking operator.
-	 */
-	public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
-		// If there are non-processed buffers from the previously blocked ones,
-		// we get the next
-		BufferOrEvent bufferOrEvent = getNonProcessed();
-
-		if (bufferOrEvent != null) {
-			return bufferOrEvent;
-		} else if (blockedNonprocessed.isEmpty() && inputFinished) {
-			return endOfStreamEvent;
-		} else {
-			// If no non-processed, get new from input
-			while (true) {
-				if (!inputFinished) {
-					// We read the next buffer from the inputgate
-					bufferOrEvent = inputGate.getNextBufferOrEvent();
-
-					if (!bufferOrEvent.isBuffer()
-							&& bufferOrEvent.getEvent() instanceof EndOfPartitionEvent) {
-						if (inputGate.isFinished()) {
-							// store the event for later if the channel is
-							// closed
-							endOfStreamEvent = bufferOrEvent;
-							inputFinished = true;
-						}
-
-					} else {
-						if (isBlocked(bufferOrEvent.getChannelIndex())) {
-							// If channel blocked we just store it
-							blockedNonprocessed.add(new SpillingBufferOrEvent(bufferOrEvent,
-									bufferSpiller, spillReader));
-						} else {
-							return bufferOrEvent;
-						}
-					}
-				} else {
-					actOnAllBlocked();
-					return getNextNonBlocked();
-				}
-			}
-		}
-	}
-
-	/**
-	 * Blocks the given channel index, from which a barrier has been received.
-	 * 
-	 * @param channelIndex
-	 *            The channel index to block.
-	 */
-	protected void blockChannel(int channelIndex) {
-		if (!blockedChannels.contains(channelIndex)) {
-			blockedChannels.add(channelIndex);
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Channel blocked with index: " + channelIndex);
-			}
-			if (isAllBlocked()) {
-				actOnAllBlocked();
-			}
-
-		} else {
-			throw new RuntimeException("Tried to block an already blocked channel");
-		}
-	}
-
-	/**
-	 * Releases the blocks on all channels.
-	 * 
-	 * @throws IOException
-	 */
-	protected void releaseBlocks() {
-		if (!nonprocessed.isEmpty()) {
-			// sanity check
-			throw new RuntimeException("Error in barrier buffer logic");
-		}
-		nonprocessed = blockedNonprocessed;
-		blockedNonprocessed = new LinkedList<SpillingBufferOrEvent>();
-
-		try {
-			spillReader.setSpillFile(bufferSpiller.getSpillFile());
-			bufferSpiller.resetSpillFile();
-		} catch (IOException e) {
-			throw new RuntimeException(e);
-		}
-
-		blockedChannels.clear();
-		superstepStarted = false;
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("All barriers received, blocks released");
-		}
-	}
-
-	/**
-	 * Method that is executed once the barrier has been received from all
-	 * channels.
-	 */
-	protected void actOnAllBlocked() {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Publishing barrier to the vertex");
-		}
-
-		if (currentSuperstep != null) {
-			reader.publish(currentSuperstep);
-		}
-
-		releaseBlocks();
-	}
-
-	/**
-	 * Processes a streaming superstep event
-	 * 
-	 * @param bufferOrEvent
-	 *            The BufferOrEvent containing the event
-	 */
-	public void processSuperstep(BufferOrEvent bufferOrEvent) {
-		StreamingSuperstep superstep = (StreamingSuperstep) bufferOrEvent.getEvent();
-		if (!superstepStarted) {
-			startSuperstep(superstep);
-		}
-		blockChannel(bufferOrEvent.getChannelIndex());
-	}
-
-	public void cleanup() throws IOException {
-		bufferSpiller.close();
-		File spillfile1 = bufferSpiller.getSpillFile();
-		if (spillfile1 != null) {
-			spillfile1.delete();
-		}
-
-		spillReader.close();
-		File spillfile2 = spillReader.getSpillFile();
-		if (spillfile2 != null) {
-			spillfile2.delete();
-		}
-	}
-
-	public String toString() {
-		return nonprocessed.toString() + blockedNonprocessed.toString();
-	}
-
-	public boolean isEmpty() {
-		return nonprocessed.isEmpty() && blockedNonprocessed.isEmpty();
-	}
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BlockingQueueBroker.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BlockingQueueBroker.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BlockingQueueBroker.java
deleted file mode 100644
index 3ee2508..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BlockingQueueBroker.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.io;
-
-import java.util.concurrent.BlockingQueue;
-
-import org.apache.flink.runtime.iterative.concurrent.Broker;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-
-@SuppressWarnings("rawtypes")
-public class BlockingQueueBroker extends Broker<BlockingQueue<StreamRecord>> {
-	/**
-	 * Singleton instance
-	 */
-	private static final BlockingQueueBroker INSTANCE = new BlockingQueueBroker();
-
-	private BlockingQueueBroker() {
-	}
-
-	/**
-	 * retrieve singleton instance
-	 */
-	public static Broker<BlockingQueue<StreamRecord>> instance() {
-		return INSTANCE;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BufferSpiller.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BufferSpiller.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BufferSpiller.java
deleted file mode 100644
index b028ea7..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BufferSpiller.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.io;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.channels.FileChannel;
-import java.util.Random;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.util.StringUtils;
-
-public class BufferSpiller {
-
-	protected static Random rnd = new Random();
-
-	private File spillFile;
-	protected FileChannel spillingChannel;
-	private String tempDir;
-
-	public BufferSpiller() throws IOException {
-		String tempDirString = GlobalConfiguration.getString(
-				ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
-				ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH);
-		String[] tempDirs = tempDirString.split(",|" + File.pathSeparator);
-
-		tempDir = tempDirs[rnd.nextInt(tempDirs.length)];
-
-		createSpillingChannel();
-	}
-
-	/**
-	 * Dumps the contents of the buffer to disk and recycles the buffer.
-	 */
-	public void spill(Buffer buffer) throws IOException {
-		try {
-			spillingChannel.write(buffer.getNioBuffer());
-			buffer.recycle();
-		} catch (IOException e) {
-			close();
-			throw new IOException(e);
-		}
-
-	}
-
-	@SuppressWarnings("resource")
-	private void createSpillingChannel() throws IOException {
-		this.spillFile = new File(tempDir, randomString(rnd) + ".buffer");
-		this.spillingChannel = new RandomAccessFile(spillFile, "rw").getChannel();
-	}
-
-	private static String randomString(Random random) {
-		final byte[] bytes = new byte[20];
-		random.nextBytes(bytes);
-		return StringUtils.byteToHexString(bytes);
-	}
-
-	public void close() throws IOException {
-		if (spillingChannel != null && spillingChannel.isOpen()) {
-			spillingChannel.close();
-		}
-	}
-
-	public void resetSpillFile() throws IOException {
-		close();
-		createSpillingChannel();
-	}
-
-	public File getSpillFile() {
-		return spillFile;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoReaderIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoReaderIterator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoReaderIterator.java
deleted file mode 100644
index ed90c03..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoReaderIterator.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.io;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.plugable.DeserializationDelegate;
-import org.apache.flink.runtime.plugable.ReusingDeserializationDelegate;
-
-/**
- * A CoReaderIterator wraps a {@link CoRecordReader} producing records of two
- * input types.
- */
-public class CoReaderIterator<T1, T2> {
-
-	private final CoRecordReader<DeserializationDelegate<T1>, DeserializationDelegate<T2>> reader; // the
-																									// source
-
-	protected final ReusingDeserializationDelegate<T1> delegate1;
-	protected final ReusingDeserializationDelegate<T2> delegate2;
-
-	public CoReaderIterator(
-			CoRecordReader<DeserializationDelegate<T1>, DeserializationDelegate<T2>> reader,
-			TypeSerializer<T1> serializer1, TypeSerializer<T2> serializer2) {
-		this.reader = reader;
-		this.delegate1 = new ReusingDeserializationDelegate<T1>(serializer1);
-		this.delegate2 = new ReusingDeserializationDelegate<T2>(serializer2);
-	}
-
-	public int next(T1 target1, T2 target2) throws IOException {
-		this.delegate1.setInstance(target1);
-		this.delegate2.setInstance(target2);
-
-		try {
-			return this.reader.getNextRecord(this.delegate1, this.delegate2);
-
-		} catch (InterruptedException e) {
-			throw new IOException("Reader interrupted.", e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
deleted file mode 100644
index 25cb25d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
+++ /dev/null
@@ -1,289 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.io;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.concurrent.LinkedBlockingDeque;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
-import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
-import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
-import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.streaming.api.streamvertex.StreamingSuperstep;
-
-/**
- * A CoRecordReader wraps {@link MutableRecordReader}s of two different input
- * types to read records effectively.
- */
-@SuppressWarnings("rawtypes")
-public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadableWritable> extends
-		AbstractReader implements EventListener<InputGate>, StreamingReader {
-
-	private final InputGate bufferReader1;
-
-	private final InputGate bufferReader2;
-
-	private final LinkedBlockingDeque<Integer> availableRecordReaders = new LinkedBlockingDeque<Integer>();
-
-	private LinkedList<Integer> processed = new LinkedList<Integer>();
-
-	private AdaptiveSpanningRecordDeserializer[] reader1RecordDeserializers;
-
-	private RecordDeserializer<T1> reader1currentRecordDeserializer;
-
-	private AdaptiveSpanningRecordDeserializer[] reader2RecordDeserializers;
-
-	private RecordDeserializer<T2> reader2currentRecordDeserializer;
-
-	// 0 => none, 1 => reader (T1), 2 => reader (T2)
-	private int currentReaderIndex;
-
-	private boolean hasRequestedPartitions;
-
-	protected CoBarrierBuffer barrierBuffer1;
-	protected CoBarrierBuffer barrierBuffer2;
-
-	public CoRecordReader(InputGate inputgate1, InputGate inputgate2) {
-		super(new UnionInputGate(inputgate1, inputgate2));
-
-		this.bufferReader1 = inputgate1;
-		this.bufferReader2 = inputgate2;
-
-		this.reader1RecordDeserializers = new AdaptiveSpanningRecordDeserializer[inputgate1
-				.getNumberOfInputChannels()];
-		this.reader2RecordDeserializers = new AdaptiveSpanningRecordDeserializer[inputgate2
-				.getNumberOfInputChannels()];
-
-		for (int i = 0; i < reader1RecordDeserializers.length; i++) {
-			reader1RecordDeserializers[i] = new AdaptiveSpanningRecordDeserializer<T1>();
-		}
-
-		for (int i = 0; i < reader2RecordDeserializers.length; i++) {
-			reader2RecordDeserializers[i] = new AdaptiveSpanningRecordDeserializer<T2>();
-		}
-
-		inputgate1.registerListener(this);
-		inputgate2.registerListener(this);
-
-		barrierBuffer1 = new CoBarrierBuffer(inputgate1, this);
-		barrierBuffer2 = new CoBarrierBuffer(inputgate2, this);
-
-		barrierBuffer1.setOtherBarrierBuffer(barrierBuffer2);
-		barrierBuffer2.setOtherBarrierBuffer(barrierBuffer1);
-	}
-
-	public void requestPartitionsOnce() throws IOException, InterruptedException {
-		if (!hasRequestedPartitions) {
-			bufferReader1.requestPartitions();
-			bufferReader2.requestPartitions();
-
-			hasRequestedPartitions = true;
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	protected int getNextRecord(T1 target1, T2 target2) throws IOException, InterruptedException {
-
-		requestPartitionsOnce();
-
-		while (true) {
-			if (currentReaderIndex == 0) {
-				if ((bufferReader1.isFinished() && bufferReader2.isFinished())) {
-					return 0;
-				}
-
-				currentReaderIndex = getNextReaderIndexBlocking();
-
-			}
-
-			if (currentReaderIndex == 1) {
-				while (true) {
-					if (reader1currentRecordDeserializer != null) {
-						RecordDeserializer.DeserializationResult result = reader1currentRecordDeserializer
-								.getNextRecord(target1);
-
-						if (result.isBufferConsumed()) {
-							reader1currentRecordDeserializer.getCurrentBuffer().recycle();
-							reader1currentRecordDeserializer = null;
-
-							currentReaderIndex = 0;
-						}
-
-						if (result.isFullRecord()) {
-							return 1;
-						}
-					} else {
-
-						final BufferOrEvent boe = barrierBuffer1.getNextNonBlocked();
-
-						if (boe.isBuffer()) {
-							reader1currentRecordDeserializer = reader1RecordDeserializers[boe
-									.getChannelIndex()];
-							reader1currentRecordDeserializer.setNextBuffer(boe.getBuffer());
-						} else if (boe.getEvent() instanceof StreamingSuperstep) {
-							barrierBuffer1.processSuperstep(boe);
-							currentReaderIndex = 0;
-
-							break;
-						} else if (handleEvent(boe.getEvent())) {
-							currentReaderIndex = 0;
-
-							break;
-						}
-					}
-				}
-			} else if (currentReaderIndex == 2) {
-				while (true) {
-					if (reader2currentRecordDeserializer != null) {
-						RecordDeserializer.DeserializationResult result = reader2currentRecordDeserializer
-								.getNextRecord(target2);
-
-						if (result.isBufferConsumed()) {
-							reader2currentRecordDeserializer.getCurrentBuffer().recycle();
-							reader2currentRecordDeserializer = null;
-
-							currentReaderIndex = 0;
-						}
-
-						if (result.isFullRecord()) {
-							return 2;
-						}
-					} else {
-						final BufferOrEvent boe = barrierBuffer2.getNextNonBlocked();
-
-						if (boe.isBuffer()) {
-							reader2currentRecordDeserializer = reader2RecordDeserializers[boe
-									.getChannelIndex()];
-							reader2currentRecordDeserializer.setNextBuffer(boe.getBuffer());
-						} else if (boe.getEvent() instanceof StreamingSuperstep) {
-							barrierBuffer2.processSuperstep(boe);
-							currentReaderIndex = 0;
-
-							break;
-						} else if (handleEvent(boe.getEvent())) {
-							currentReaderIndex = 0;
-
-							break;
-						}
-					}
-				}
-			} else {
-				throw new IllegalStateException("Bug: unexpected current reader index.");
-			}
-		}
-	}
-
-	protected int getNextReaderIndexBlocking() throws InterruptedException {
-
-		Integer nextIndex = 0;
-
-		while (processed.contains(nextIndex = availableRecordReaders.take())) {
-			processed.remove(nextIndex);
-		}
-
-		if (nextIndex == 1) {
-			if (barrierBuffer1.isAllBlocked()) {
-				availableRecordReaders.addFirst(1);
-				processed.add(2);
-				return 2;
-			} else {
-				return 1;
-			}
-		} else {
-			if (barrierBuffer2.isAllBlocked()) {
-				availableRecordReaders.addFirst(2);
-				processed.add(1);
-				return 1;
-			} else {
-				return 2;
-			}
-
-		}
-
-	}
-
-	// ------------------------------------------------------------------------
-	// Data availability notifications
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void onEvent(InputGate bufferReader) {
-		addToAvailable(bufferReader);
-	}
-
-	protected void addToAvailable(InputGate bufferReader) {
-		if (bufferReader == bufferReader1) {
-			availableRecordReaders.add(1);
-		} else if (bufferReader == bufferReader2) {
-			availableRecordReaders.add(2);
-		}
-	}
-
-	public void clearBuffers() {
-		for (RecordDeserializer<?> deserializer : reader1RecordDeserializers) {
-			Buffer buffer = deserializer.getCurrentBuffer();
-			if (buffer != null && !buffer.isRecycled()) {
-				buffer.recycle();
-			}
-		}
-		for (RecordDeserializer<?> deserializer : reader2RecordDeserializers) {
-			Buffer buffer = deserializer.getCurrentBuffer();
-			if (buffer != null && !buffer.isRecycled()) {
-				buffer.recycle();
-			}
-		}
-	}
-
-	private class CoBarrierBuffer extends BarrierBuffer {
-
-		private CoBarrierBuffer otherBuffer;
-
-		public CoBarrierBuffer(InputGate inputGate, AbstractReader reader) {
-			super(inputGate, reader);
-		}
-
-		public void setOtherBarrierBuffer(CoBarrierBuffer other) {
-			this.otherBuffer = other;
-		}
-
-		@Override
-		protected void actOnAllBlocked() {
-			if (otherBuffer.isAllBlocked()) {
-				super.actOnAllBlocked();
-				otherBuffer.releaseBlocks();
-			}
-		}
-
-	}
-
-	public void cleanup() throws IOException {
-		try {
-			barrierBuffer1.cleanup();
-		} finally {
-			barrierBuffer2.cleanup();
-		}
-
-	}
-}


Mime
View raw message