flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gyf...@apache.org
Subject [18/19] flink git commit: [streaming] Major internal renaming and restructure
Date Wed, 15 Apr 2015 09:38:59 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
deleted file mode 100644
index 1502ec7..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
+++ /dev/null
@@ -1,346 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.ScheduleMode;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.streaming.api.StreamGraph.StreamLoop;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.invokable.StreamInvokable.ChainingStrategy;
-import org.apache.flink.streaming.api.streamvertex.StreamIterationHead;
-import org.apache.flink.streaming.api.streamvertex.StreamIterationTail;
-import org.apache.flink.streaming.partitioner.StreamPartitioner;
-import org.apache.flink.streaming.partitioner.StreamPartitioner.PartitioningStrategy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StreamingJobGraphGenerator {
-
-	private static final Logger LOG = LoggerFactory.getLogger(StreamingJobGraphGenerator.class);
-
-	private StreamGraph streamGraph;
-
-	private Map<Integer, AbstractJobVertex> jobVertices;
-	private JobGraph jobGraph;
-	private Collection<Integer> builtVertices;
-
-	private List<StreamEdge> physicalEdgesInOrder;
-
-	private Map<Integer, Map<Integer, StreamConfig>> chainedConfigs;
-
-	private Map<Integer, StreamConfig> vertexConfigs;
-	private Map<Integer, String> chainedNames;
-
-	public StreamingJobGraphGenerator(StreamGraph streamGraph) {
-		this.streamGraph = streamGraph;
-	}
-
-	private void init() {
-		this.jobVertices = new HashMap<Integer, AbstractJobVertex>();
-		this.builtVertices = new HashSet<Integer>();
-		this.chainedConfigs = new HashMap<Integer, Map<Integer, StreamConfig>>();
-		this.vertexConfigs = new HashMap<Integer, StreamConfig>();
-		this.chainedNames = new HashMap<Integer, String>();
-		this.physicalEdgesInOrder = new ArrayList<StreamEdge>();
-	}
-
-	public JobGraph createJobGraph(String jobName) {
-		jobGraph = new JobGraph(jobName);
-
-		// Turn lazy scheduling off
-		jobGraph.setScheduleMode(ScheduleMode.ALL);
-		jobGraph.setJobType(JobGraph.JobType.STREAMING);
-		jobGraph.setCheckpointingEnabled(streamGraph.isCheckpointingEnabled());
-		jobGraph.setCheckpointingInterval(streamGraph.getCheckpointingInterval());
-
-		if (jobGraph.isCheckpointingEnabled()) {
-			int executionRetries = streamGraph.getExecutionConfig().getNumberOfExecutionRetries();
-			if (executionRetries != -1) {
-				jobGraph.setNumberOfExecutionRetries(executionRetries);
-			} else {
-				jobGraph.setNumberOfExecutionRetries(Integer.MAX_VALUE);
-			}
-		}
-		init();
-
-		setChaining();
-
-		setPhysicalEdges();
-
-		setSlotSharing();
-
-		return jobGraph;
-	}
-
-	private void setPhysicalEdges() {
-		Map<Integer, List<StreamEdge>> physicalInEdgesInOrder = new HashMap<Integer, List<StreamEdge>>();
-
-		for (StreamEdge edge : physicalEdgesInOrder) {
-			int target = edge.getTargetID();
-
-			List<StreamEdge> inEdges = physicalInEdgesInOrder.get(target);
-
-			// create if not set
-			if (inEdges == null) {
-				inEdges = new ArrayList<StreamEdge>();
-				physicalInEdgesInOrder.put(target, inEdges);
-			}
-
-			inEdges.add(edge);
-		}
-
-		for (Map.Entry<Integer, List<StreamEdge>> inEdges : physicalInEdgesInOrder.entrySet()) {
-			int vertex = inEdges.getKey();
-			List<StreamEdge> edgeList = inEdges.getValue();
-
-			vertexConfigs.get(vertex).setInPhysicalEdges(edgeList);
-		}
-	}
-
-	private void setChaining() {
-		for (Integer sourceName : streamGraph.getSourceIDs()) {
-			createChain(sourceName, sourceName);
-		}
-	}
-
-	private List<StreamEdge> createChain(Integer startNode, Integer current) {
-
-		if (!builtVertices.contains(startNode)) {
-
-			List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
-
-			List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
-			List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
-
-			for (StreamEdge outEdge : streamGraph.getVertex(current).getOutEdges()) {
-				if (isChainable(outEdge)) {
-					chainableOutputs.add(outEdge);
-				} else {
-					nonChainableOutputs.add(outEdge);
-				}
-			}
-
-			for (StreamEdge chainable : chainableOutputs) {
-				transitiveOutEdges.addAll(createChain(startNode, chainable.getTargetID()));
-			}
-
-			for (StreamEdge nonChainable : nonChainableOutputs) {
-				transitiveOutEdges.add(nonChainable);
-				createChain(nonChainable.getTargetID(), nonChainable.getTargetID());
-			}
-
-			chainedNames.put(current, createChainedName(current, chainableOutputs));
-
-			StreamConfig config = current.equals(startNode) ? createProcessingVertex(startNode)
-					: new StreamConfig(new Configuration());
-
-			setVertexConfig(current, config, chainableOutputs, nonChainableOutputs);
-
-			if (current.equals(startNode)) {
-
-				config.setChainStart();
-				config.setOutEdgesInOrder(transitiveOutEdges);
-				config.setOutEdges(streamGraph.getVertex(current).getOutEdges());
-
-				for (StreamEdge edge : transitiveOutEdges) {
-					connect(startNode, edge);
-				}
-
-				config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNode));
-
-			} else {
-
-				Map<Integer, StreamConfig> chainedConfs = chainedConfigs.get(startNode);
-
-				if (chainedConfs == null) {
-					chainedConfigs.put(startNode, new HashMap<Integer, StreamConfig>());
-				}
-				chainedConfigs.get(startNode).put(current, config);
-			}
-
-			return transitiveOutEdges;
-
-		} else {
-			return new ArrayList<StreamEdge>();
-		}
-	}
-
-	private String createChainedName(Integer vertexID, List<StreamEdge> chainedOutputs) {
-		String operatorName = streamGraph.getVertex(vertexID).getOperatorName();
-		if (chainedOutputs.size() > 1) {
-			List<String> outputChainedNames = new ArrayList<String>();
-			for (StreamEdge chainable : chainedOutputs) {
-				outputChainedNames.add(chainedNames.get(chainable.getTargetID()));
-			}
-			String returnOperatorName = operatorName + " -> ("
-					+ StringUtils.join(outputChainedNames, ", ") + ")";
-			return returnOperatorName;
-		} else if (chainedOutputs.size() == 1) {
-			String returnOperatorName = operatorName + " -> "
-					+ chainedNames.get(chainedOutputs.get(0).getTargetID());
-			return returnOperatorName;
-		} else {
-			return operatorName;
-		}
-
-	}
-
-	private StreamConfig createProcessingVertex(Integer vertexID) {
-
-		AbstractJobVertex jobVertex = new AbstractJobVertex(chainedNames.get(vertexID));
-		StreamNode vertex = streamGraph.getVertex(vertexID);
-
-		jobVertex.setInvokableClass(vertex.getJobVertexClass());
-
-		int parallelism = vertex.getParallelism();
-
-		if (parallelism > 0) {
-			jobVertex.setParallelism(parallelism);
-		}
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Parallelism set: {} for {}", parallelism, vertexID);
-		}
-
-		if (vertex.getInputFormat() != null) {
-			jobVertex.setInputSplitSource(vertex.getInputFormat());
-		}
-
-		jobVertices.put(vertexID, jobVertex);
-		builtVertices.add(vertexID);
-		jobGraph.addVertex(jobVertex);
-
-		StreamConfig retConfig = new StreamConfig(jobVertex.getConfiguration());
-		retConfig.setOperatorName(chainedNames.get(vertexID));
-		return retConfig;
-	}
-
-	private void setVertexConfig(Integer vertexID, StreamConfig config,
-			List<StreamEdge> chainableOutputs, List<StreamEdge> nonChainableOutputs) {
-
-		StreamNode vertex = streamGraph.getVertex(vertexID);
-
-		config.setVertexID(vertexID);
-		config.setBufferTimeout(vertex.getBufferTimeout());
-
-		config.setTypeSerializerIn1(vertex.getTypeSerializerIn1());
-		config.setTypeSerializerIn2(vertex.getTypeSerializerIn2());
-		config.setTypeSerializerOut1(vertex.getTypeSerializerOut());
-
-		config.setUserInvokable(vertex.getInvokable());
-		config.setOutputSelectorWrapper(vertex.getOutputSelectorWrapper());
-
-		config.setNumberOfOutputs(nonChainableOutputs.size());
-		config.setNonChainedOutputs(nonChainableOutputs);
-		config.setChainedOutputs(chainableOutputs);
-		config.setStateMonitoring(streamGraph.isCheckpointingEnabled());
-
-		Class<? extends AbstractInvokable> vertexClass = vertex.getJobVertexClass();
-
-		if (vertexClass.equals(StreamIterationHead.class)
-				|| vertexClass.equals(StreamIterationTail.class)) {
-			config.setIterationId(streamGraph.getLoopID(vertexID));
-			config.setIterationWaitTime(streamGraph.getLoopTimeout(vertexID));
-		}
-
-		List<StreamEdge> allOutputs = new ArrayList<StreamEdge>(chainableOutputs);
-		allOutputs.addAll(nonChainableOutputs);
-
-		for (StreamEdge output : allOutputs) {
-			config.setSelectedNames(output.getTargetID(),
-					streamGraph.getEdge(vertexID, output.getTargetID()).getSelectedNames());
-		}
-
-		vertexConfigs.put(vertexID, config);
-	}
-
-	private void connect(Integer headOfChain, StreamEdge edge) {
-
-		physicalEdgesInOrder.add(edge);
-
-		Integer downStreamvertexID = edge.getTargetID();
-
-		AbstractJobVertex headVertex = jobVertices.get(headOfChain);
-		AbstractJobVertex downStreamVertex = jobVertices.get(downStreamvertexID);
-
-		StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration());
-
-		downStreamConfig.setNumberOfInputs(downStreamConfig.getNumberOfInputs() + 1);
-
-		StreamPartitioner<?> partitioner = edge.getPartitioner();
-		if (partitioner.getStrategy() == PartitioningStrategy.FORWARD) {
-			downStreamVertex.connectNewDataSetAsInput(headVertex, DistributionPattern.POINTWISE);
-		} else {
-			downStreamVertex.connectNewDataSetAsInput(headVertex, DistributionPattern.ALL_TO_ALL);
-		}
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("CONNECTED: {} - {} -> {}", partitioner.getClass().getSimpleName(),
-					headOfChain, downStreamvertexID);
-		}
-	}
-
-	private boolean isChainable(StreamEdge edge) {
-		StreamNode upStreamVertex = edge.getSourceVertex();
-		StreamNode downStreamVertex = edge.getTargetVertex();
-
-		StreamInvokable<?, ?> headInvokable = upStreamVertex.getInvokable();
-		StreamInvokable<?, ?> outInvokable = downStreamVertex.getInvokable();
-
-		return downStreamVertex.getInEdges().size() == 1
-				&& outInvokable != null
-				&& outInvokable.getChainingStrategy() == ChainingStrategy.ALWAYS
-				&& (headInvokable.getChainingStrategy() == ChainingStrategy.HEAD || headInvokable
-						.getChainingStrategy() == ChainingStrategy.ALWAYS)
-				&& (edge.getPartitioner().getStrategy() == PartitioningStrategy.FORWARD || downStreamVertex
-						.getParallelism() == 1)
-				&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
-				&& streamGraph.isChainingEnabled();
-	}
-
-	private void setSlotSharing() {
-		SlotSharingGroup shareGroup = new SlotSharingGroup();
-
-		for (AbstractJobVertex vertex : jobVertices.values()) {
-			vertex.setSlotSharingGroup(shareGroup);
-		}
-
-		for (StreamLoop loop : streamGraph.getStreamLoops()) {
-			CoLocationGroup ccg = new CoLocationGroup();
-			AbstractJobVertex tail = jobVertices.get(loop.getTail().getID());
-			AbstractJobVertex head = jobVertices.get(loop.getHead().getID());
-
-			ccg.addVertex(head);
-			ccg.addVertex(tail);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/WindowingOptimizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/WindowingOptimizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/WindowingOptimizer.java
deleted file mode 100644
index ca663bb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/WindowingOptimizer.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.invokable.operator.windowing.StreamDiscretizer;
-import org.apache.flink.streaming.api.invokable.operator.windowing.WindowFlattener;
-import org.apache.flink.streaming.api.invokable.operator.windowing.WindowMerger;
-import org.apache.flink.streaming.partitioner.DistributePartitioner;
-
-public class WindowingOptimizer {
-
-	public static void optimizeGraph(StreamGraph streamGraph) {
-
-		// Share common discrtizers
-		setDiscretizerReuse(streamGraph);
-
-		// Remove unnecessary merges before flatten operators
-		removeMergeBeforeFlatten(streamGraph);
-	}
-
-	@SuppressWarnings("rawtypes")
-	private static void removeMergeBeforeFlatten(StreamGraph streamGraph) {
-		Set<Tuple2<Integer, StreamInvokable<?, ?>>> invokables = streamGraph.getInvokables();
-		List<Integer> flatteners = new ArrayList<Integer>();
-
-		for (Tuple2<Integer, StreamInvokable<?, ?>> entry : invokables) {
-			if (entry.f1 instanceof WindowFlattener) {
-				flatteners.add(entry.f0);
-			}
-		}
-
-		for (Integer flattenerID : flatteners) {
-			// Flatteners should have exactly one input
-			StreamNode input = streamGraph.getVertex(flattenerID).getInEdges().get(0)
-					.getSourceVertex();
-
-			// Check whether the flatten is applied after a merge
-			if (input.getInvokable() instanceof WindowMerger) {
-
-				// Mergers should have exactly one input
-				StreamNode mergeInput = input.getInEdges().get(0).getSourceVertex();
-
-				// We connect the merge input to the flattener directly
-				streamGraph.addEdge(mergeInput.getID(), flattenerID,
-						new DistributePartitioner(true), 0, new ArrayList<String>());
-
-				// If the merger is only connected to the flattener we delete it
-				// completely, otherwise we only remove the edge
-				if (input.getOutEdges().size() > 1) {
-					streamGraph.removeEdge(streamGraph.getEdge(input.getID(), flattenerID));
-				} else {
-					streamGraph.removeVertex(input);
-				}
-
-				streamGraph.setParallelism(flattenerID, mergeInput.getParallelism());
-			}
-		}
-
-	}
-
-	private static void setDiscretizerReuse(StreamGraph streamGraph) {
-
-		Set<Tuple2<Integer, StreamInvokable<?, ?>>> invokables = streamGraph.getInvokables();
-		List<Tuple2<Integer, StreamDiscretizer<?>>> discretizers = new ArrayList<Tuple2<Integer, StreamDiscretizer<?>>>();
-
-		// Get the discretizers
-		for (Tuple2<Integer, StreamInvokable<?, ?>> entry : invokables) {
-			if (entry.f1 instanceof StreamDiscretizer) {
-				discretizers.add(new Tuple2<Integer, StreamDiscretizer<?>>(entry.f0,
-						(StreamDiscretizer<?>) entry.f1));
-			}
-		}
-
-		List<Tuple2<StreamDiscretizer<?>, List<Integer>>> matchingDiscretizers = new ArrayList<Tuple2<StreamDiscretizer<?>, List<Integer>>>();
-
-		for (Tuple2<Integer, StreamDiscretizer<?>> discretizer : discretizers) {
-			boolean inMatching = false;
-			for (Tuple2<StreamDiscretizer<?>, List<Integer>> matching : matchingDiscretizers) {
-				Set<Integer> discretizerInEdges = new HashSet<Integer>(streamGraph.getVertex(
-						discretizer.f0).getInEdgeIndices());
-				Set<Integer> matchingInEdges = new HashSet<Integer>(streamGraph.getVertex(
-						matching.f1.get(0)).getInEdgeIndices());
-
-				if (discretizer.f1.equals(matching.f0)
-						&& discretizerInEdges.equals(matchingInEdges)) {
-					matching.f1.add(discretizer.f0);
-					inMatching = true;
-					break;
-				}
-			}
-			if (!inMatching) {
-				List<Integer> matchingNames = new ArrayList<Integer>();
-				matchingNames.add(discretizer.f0);
-				matchingDiscretizers.add(new Tuple2<StreamDiscretizer<?>, List<Integer>>(
-						discretizer.f1, matchingNames));
-			}
-		}
-
-		for (Tuple2<StreamDiscretizer<?>, List<Integer>> matching : matchingDiscretizers) {
-			List<Integer> matchList = matching.f1;
-			if (matchList.size() > 1) {
-				Integer first = matchList.get(0);
-				for (int i = 1; i < matchList.size(); i++) {
-					replaceDiscretizer(streamGraph, matchList.get(i), first);
-				}
-			}
-		}
-	}
-
-	private static void replaceDiscretizer(StreamGraph streamGraph, Integer toReplaceID,
-			Integer replaceWithID) {
-		// Convert to array to create a copy
-		List<StreamEdge> outEdges = new ArrayList<StreamEdge>(streamGraph.getVertex(toReplaceID)
-				.getOutEdges());
-
-		int numOutputs = outEdges.size();
-
-		// Reconnect outputs
-		for (int i = 0; i < numOutputs; i++) {
-			StreamEdge outEdge = outEdges.get(i);
-
-			streamGraph.addEdge(replaceWithID, outEdge.getTargetID(), outEdge.getPartitioner(), 0,
-					new ArrayList<String>());
-		}
-
-		// Remove the other discretizer
-		streamGraph.removeVertex(streamGraph.getVertex(toReplaceID));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java
index 4a0369c..4f77ecb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java
@@ -17,8 +17,8 @@
 
 package org.apache.flink.streaming.api.collector;
 
-import org.apache.flink.streaming.api.StreamEdge;
 import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
+import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.util.Collector;
 
 public class CollectorWrapper<OUT> implements Collector<OUT> {

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
index c3f694e..2ef7d99 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
@@ -22,8 +22,8 @@ import java.io.IOException;
 import org.apache.flink.runtime.event.task.TaskEvent;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.io.StreamRecordWriter;
+import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
@@ -36,7 +36,6 @@ public class StreamOutput<OUT> implements Collector<OUT> {
 	private RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output;
 	private SerializationDelegate<StreamRecord<OUT>> serializationDelegate;
 	private StreamRecord<OUT> streamRecord;
-	private int channelID;
 
 	public StreamOutput(RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output,
 			int channelID, SerializationDelegate<StreamRecord<OUT>> serializationDelegate) {
@@ -48,7 +47,6 @@ public class StreamOutput<OUT> implements Collector<OUT> {
 		} else {
 			throw new RuntimeException("Serializer cannot be null");
 		}
-		this.channelID = channelID;
 		this.output = output;
 	}
 
@@ -59,7 +57,6 @@ public class StreamOutput<OUT> implements Collector<OUT> {
 	@Override
 	public void collect(OUT record) {
 		streamRecord.setObject(record);
-		streamRecord.newId(channelID);
 		serializationDelegate.setInstance(streamRecord);
 
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
index 78ef914..b90cce2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.collector.selector;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.flink.streaming.api.StreamEdge;
+import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.util.Collector;
 
 public class BroadcastOutputSelectorWrapper<OUT> implements OutputSelectorWrapper<OUT> {

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
index 1cb20d9..8ca0508 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
@@ -24,7 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.flink.streaming.api.StreamEdge;
+import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
index 850a1d9..937b69f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.collector.selector;
 
 import java.io.Serializable;
 
-import org.apache.flink.streaming.api.StreamEdge;
+import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.util.Collector;
 
 public interface OutputSelectorWrapper<OUT> extends Serializable {

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index d34d9d2..dd4a84b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -23,20 +23,20 @@ import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.StreamGraph;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
-import org.apache.flink.streaming.api.function.co.CoMapFunction;
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import org.apache.flink.streaming.api.function.co.CoWindowFunction;
-import org.apache.flink.streaming.api.function.co.RichCoMapFunction;
-import org.apache.flink.streaming.api.function.co.RichCoReduceFunction;
-import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable;
-import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedReduceInvokable;
-import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
-import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
-import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable;
-import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
+import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
+import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.functions.co.CoReduceFunction;
+import org.apache.flink.streaming.api.functions.co.CoWindowFunction;
+import org.apache.flink.streaming.api.functions.co.RichCoMapFunction;
+import org.apache.flink.streaming.api.functions.co.RichCoReduceFunction;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
+import org.apache.flink.streaming.api.operators.co.CoStreamGroupedReduce;
+import org.apache.flink.streaming.api.operators.co.CoStreamMap;
+import org.apache.flink.streaming.api.operators.co.CoStreamOperator;
+import org.apache.flink.streaming.api.operators.co.CoStreamReduce;
+import org.apache.flink.streaming.api.operators.co.CoStreamWindow;
 import org.apache.flink.streaming.api.windowing.helper.SystemTimestamp;
 import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 
@@ -247,7 +247,7 @@ public class ConnectedDataStream<IN1, IN2> {
 				CoMapFunction.class, false, true, getInputType1(), getInputType2(),
 				Utils.getCallLocationName(), true);
 
-		return addCoFunction("Co-Map", outTypeInfo, new CoMapInvokable<IN1, IN2, OUT>(
+		return addCoFunction("Co-Map", outTypeInfo, new CoStreamMap<IN1, IN2, OUT>(
 				clean(coMapper)));
 
 	}
@@ -274,7 +274,7 @@ public class ConnectedDataStream<IN1, IN2> {
 				CoFlatMapFunction.class, false, true, getInputType1(), getInputType2(),
 				Utils.getCallLocationName(), true);
 
-		return addCoFunction("Co-Flat Map", outTypeInfo, new CoFlatMapInvokable<IN1, IN2, OUT>(
+		return addCoFunction("Co-Flat Map", outTypeInfo, new CoStreamFlatMap<IN1, IN2, OUT>(
 				clean(coFlatMapper)));
 	}
 
@@ -300,7 +300,7 @@ public class ConnectedDataStream<IN1, IN2> {
 				CoReduceFunction.class, false, true, getInputType1(), getInputType2(),
 				Utils.getCallLocationName(), true);
 
-		return addCoFunction("Co-Reduce", outTypeInfo, getReduceInvokable(clean(coReducer)));
+		return addCoFunction("Co-Reduce", outTypeInfo, getReduceOperator(clean(coReducer)));
 
 	}
 
@@ -368,21 +368,21 @@ public class ConnectedDataStream<IN1, IN2> {
 				CoWindowFunction.class, false, true, getInputType1(), getInputType2(),
 				Utils.getCallLocationName(), true);
 
-		return addCoFunction("Co-Window", outTypeInfo, new CoWindowInvokable<IN1, IN2, OUT>(
+		return addCoFunction("Co-Window", outTypeInfo, new CoStreamWindow<IN1, IN2, OUT>(
 				clean(coWindowFunction), windowSize, slideInterval, timestamp1, timestamp2));
 
 	}
 
-	protected <OUT> CoInvokable<IN1, IN2, OUT> getReduceInvokable(
+	protected <OUT> CoStreamOperator<IN1, IN2, OUT> getReduceOperator(
 			CoReduceFunction<IN1, IN2, OUT> coReducer) {
-		CoReduceInvokable<IN1, IN2, OUT> invokable;
+		CoStreamReduce<IN1, IN2, OUT> operator;
 		if (isGrouped) {
-			invokable = new CoGroupedReduceInvokable<IN1, IN2, OUT>(clean(coReducer), keySelector1,
+			operator = new CoStreamGroupedReduce<IN1, IN2, OUT>(clean(coReducer), keySelector1,
 					keySelector2);
 		} else {
-			invokable = new CoReduceInvokable<IN1, IN2, OUT>(clean(coReducer));
+			operator = new CoStreamReduce<IN1, IN2, OUT>(clean(coReducer));
 		}
-		return invokable;
+		return operator;
 	}
 
 	public <OUT> SingleOutputStreamOperator<OUT, ?> addGeneralWindowCombine(
@@ -397,19 +397,19 @@ public class ConnectedDataStream<IN1, IN2> {
 			throw new IllegalArgumentException("Slide interval must be positive");
 		}
 
-		return addCoFunction("Co-Window", outTypeInfo, new CoWindowInvokable<IN1, IN2, OUT>(
+		return addCoFunction("Co-Window", outTypeInfo, new CoStreamWindow<IN1, IN2, OUT>(
 				clean(coWindowFunction), windowSize, slideInterval, timestamp1, timestamp2));
 
 	}
 
 	public <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String functionName,
-			TypeInformation<OUT> outTypeInfo, CoInvokable<IN1, IN2, OUT> functionInvokable) {
+			TypeInformation<OUT> outTypeInfo, CoStreamOperator<IN1, IN2, OUT> operator) {
 
 		@SuppressWarnings({ "unchecked", "rawtypes" })
 		SingleOutputStreamOperator<OUT, ?> returnStream = new SingleOutputStreamOperator(
-				environment, functionName, outTypeInfo, functionInvokable);
+				environment, functionName, outTypeInfo, operator);
 
-		dataStream1.streamGraph.addCoOperator(returnStream.getId(), functionInvokable, getInputType1(),
+		dataStream1.streamGraph.addCoOperator(returnStream.getId(), operator, getInputType1(),
 				getInputType2(), outTypeInfo, functionName);
 
 		dataStream1.connectGraph(dataStream1, returnStream.getId(), 1);

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 0e2a067..94aca48 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -50,39 +50,39 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.streaming.api.StreamGraph;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.datastream.temporaloperator.StreamCrossOperator;
-import org.apache.flink.streaming.api.datastream.temporaloperator.StreamJoinOperator;
+import org.apache.flink.streaming.api.datastream.temporal.StreamCrossOperator;
+import org.apache.flink.streaming.api.datastream.temporal.StreamJoinOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType;
-import org.apache.flink.streaming.api.function.aggregation.ComparableAggregator;
-import org.apache.flink.streaming.api.function.aggregation.SumAggregator;
-import org.apache.flink.streaming.api.function.sink.FileSinkFunctionByMillis;
-import org.apache.flink.streaming.api.function.sink.PrintSinkFunction;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.streaming.api.function.sink.SocketClientSink;
-import org.apache.flink.streaming.api.invokable.SinkInvokable;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.invokable.operator.CounterInvokable;
-import org.apache.flink.streaming.api.invokable.operator.FilterInvokable;
-import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable;
-import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
-import org.apache.flink.streaming.api.invokable.operator.StreamFoldInvokable;
-import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable;
+import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
+import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
+import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
+import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType;
+import org.apache.flink.streaming.api.functions.sink.FileSinkFunctionByMillis;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SocketClientSink;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.operators.StreamCounter;
+import org.apache.flink.streaming.api.operators.StreamFilter;
+import org.apache.flink.streaming.api.operators.StreamFlatMap;
+import org.apache.flink.streaming.api.operators.StreamFold;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamReduce;
+import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.api.windowing.helper.Count;
 import org.apache.flink.streaming.api.windowing.helper.Delta;
 import org.apache.flink.streaming.api.windowing.helper.Time;
 import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
 import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-import org.apache.flink.streaming.partitioner.BroadcastPartitioner;
-import org.apache.flink.streaming.partitioner.DistributePartitioner;
-import org.apache.flink.streaming.partitioner.FieldsPartitioner;
-import org.apache.flink.streaming.partitioner.GlobalPartitioner;
-import org.apache.flink.streaming.partitioner.ShufflePartitioner;
-import org.apache.flink.streaming.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.DistributePartitioner;
+import org.apache.flink.streaming.runtime.partitioner.FieldsPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.util.keys.KeySelectorUtil;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 
@@ -511,7 +511,7 @@ public class DataStream<OUT> {
 		TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
 				Utils.getCallLocationName(), true);
 
-		return transform("Map", outType, new MapInvokable<OUT, R>(clean(mapper)));
+		return transform("Map", outType, new StreamMap<OUT, R>(clean(mapper)));
 	}
 
 	/**
@@ -535,7 +535,7 @@ public class DataStream<OUT> {
 		TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
 				getType(), Utils.getCallLocationName(), true);
 
-		return transform("Flat Map", outType, new FlatMapInvokable<OUT, R>(clean(flatMapper)));
+		return transform("Flat Map", outType, new StreamFlatMap<OUT, R>(clean(flatMapper)));
 
 	}
 
@@ -553,7 +553,7 @@ public class DataStream<OUT> {
 	 */
 	public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reducer) {
 
-		return transform("Reduce", getType(), new StreamReduceInvokable<OUT>(clean(reducer)));
+		return transform("Reduce", getType(), new StreamReduce<OUT>(clean(reducer)));
 
 	}
 
@@ -573,7 +573,7 @@ public class DataStream<OUT> {
 		TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(folder), getType(),
 				Utils.getCallLocationName(), false);
 
-		return transform("Fold", outType, new StreamFoldInvokable<OUT, R>(clean(folder),
+		return transform("Fold", outType, new StreamFold<OUT, R>(clean(folder),
 				initialValue, outType));
 	}
 
@@ -592,7 +592,7 @@ public class DataStream<OUT> {
 	 * @return The filtered DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> filter(FilterFunction<OUT> filter) {
-		return transform("Filter", getType(), new FilterInvokable<OUT>(clean(filter)));
+		return transform("Filter", getType(), new StreamFilter<OUT>(clean(filter)));
 
 	}
 
@@ -902,7 +902,7 @@ public class DataStream<OUT> {
 	public SingleOutputStreamOperator<Long, ?> count() {
 		TypeInformation<Long> outTypeInfo = BasicTypeInfo.LONG_TYPE_INFO;
 
-		return transform("Count", outTypeInfo, new CounterInvokable<OUT>());
+		return transform("Count", outTypeInfo, new StreamCounter<OUT>());
 	}
 
 	/**
@@ -1175,36 +1175,36 @@ public class DataStream<OUT> {
 
 	protected SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate) {
 
-		StreamReduceInvokable<OUT> invokable = new StreamReduceInvokable<OUT>(aggregate);
+		StreamReduce<OUT> operator = new StreamReduce<OUT>(aggregate);
 
 		SingleOutputStreamOperator<OUT, ?> returnStream = transform("Aggregation", getType(),
-				invokable);
+				operator);
 
 		return returnStream;
 	}
 
 	/**
-	 * Method for passing user defined invokables along with the type
+	 * Method for passing user defined operators along with the type
 	 * informations that will transform the DataStream.
 	 * 
 	 * @param operatorName
 	 *            name of the operator, for logging purposes
 	 * @param outTypeInfo
 	 *            the output type of the operator
-	 * @param invokable
+	 * @param operator
 	 *            the object containing the transformation logic
 	 * @param <R>
 	 *            type of the return stream
 	 * @return the data stream constructed
 	 */
 	public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName,
-			TypeInformation<R> outTypeInfo, StreamInvokable<OUT, R> invokable) {
+			TypeInformation<R> outTypeInfo, StreamOperator<OUT, R> operator) {
 		DataStream<OUT> inputStream = this.copy();
 		@SuppressWarnings({ "unchecked", "rawtypes" })
 		SingleOutputStreamOperator<R, ?> returnStream = new SingleOutputStreamOperator(environment,
-				operatorName, outTypeInfo, invokable);
+				operatorName, outTypeInfo, operator);
 
-		streamGraph.addOperator(returnStream.getId(), invokable, getType(), outTypeInfo,
+		streamGraph.addOperator(returnStream.getId(), operator, getType(), outTypeInfo,
 				operatorName);
 
 		connectGraph(inputStream, returnStream.getId(), 0);
@@ -1261,12 +1261,12 @@ public class DataStream<OUT> {
 	 */
 	public DataStreamSink<OUT> addSink(SinkFunction<OUT> sinkFunction) {
 
-		StreamInvokable<OUT, OUT> sinkInvokable = new SinkInvokable<OUT>(clean(sinkFunction));
+		StreamOperator<OUT, OUT> sinkOperator = new StreamSink<OUT>(clean(sinkFunction));
 
 		DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink", getType(),
-				sinkInvokable);
+				sinkOperator);
 
-		streamGraph.addOperator(returnStream.getId(), sinkInvokable, getType(), null,
+		streamGraph.addOperator(returnStream.getId(), sinkOperator, getType(), null,
 				"Stream Sink");
 
 		this.connectGraph(this.copy(), returnStream.getId(), 0);

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
index f064332..38a376e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.datastream;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.operators.StreamOperator;
 
 /**
  * Represents the end of a DataStream.
@@ -30,8 +30,8 @@ import org.apache.flink.streaming.api.invokable.StreamInvokable;
 public class DataStreamSink<IN> extends SingleOutputStreamOperator<IN, DataStreamSink<IN>> {
 
 	protected DataStreamSink(StreamExecutionEnvironment environment, String operatorType,
-			TypeInformation<IN> outTypeInfo, StreamInvokable<?,?> invokable) {
-		super(environment, operatorType, outTypeInfo, invokable);
+			TypeInformation<IN> outTypeInfo, StreamOperator<?,?> operator) {
+		super(environment, operatorType, outTypeInfo, operator);
 	}
 
 	protected DataStreamSink(DataStream<IN> dataStream) {

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
index bc416ba..56e4893 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.datastream;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.operators.StreamOperator;
 
 /**
  * The DataStreamSource represents the starting point of a DataStream.
@@ -32,11 +32,11 @@ public class DataStreamSource<OUT> extends SingleOutputStreamOperator<OUT, DataS
 	boolean isParallel;
 
 	public DataStreamSource(StreamExecutionEnvironment environment, String operatorType,
-			TypeInformation<OUT> outTypeInfo, StreamInvokable<?, OUT> invokable,
+			TypeInformation<OUT> outTypeInfo, StreamOperator<?, OUT> operator,
 			boolean isParallel, String sourceName) {
-		super(environment, operatorType, outTypeInfo, invokable);
+		super(environment, operatorType, outTypeInfo, operator);
 
-		environment.getStreamGraph().addSource(getId(), invokable, null, outTypeInfo,
+		environment.getStreamGraph().addSource(getId(), operator, null, outTypeInfo,
 				sourceName);
 
 		this.isParallel = isParallel;

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
index 2ac60fe..2a58f60 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
@@ -29,22 +29,22 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.function.WindowMapFunction;
-import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.invokable.operator.FilterInvokable;
-import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable;
-import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable;
-import org.apache.flink.streaming.api.invokable.operator.windowing.EmptyWindowFilter;
-import org.apache.flink.streaming.api.invokable.operator.windowing.ParallelGroupedMerge;
-import org.apache.flink.streaming.api.invokable.operator.windowing.ParallelMerge;
-import org.apache.flink.streaming.api.invokable.operator.windowing.WindowFlattener;
-import org.apache.flink.streaming.api.invokable.operator.windowing.WindowFolder;
-import org.apache.flink.streaming.api.invokable.operator.windowing.WindowMapper;
-import org.apache.flink.streaming.api.invokable.operator.windowing.WindowMerger;
-import org.apache.flink.streaming.api.invokable.operator.windowing.WindowPartExtractor;
-import org.apache.flink.streaming.api.invokable.operator.windowing.WindowPartitioner;
-import org.apache.flink.streaming.api.invokable.operator.windowing.WindowReducer;
+import org.apache.flink.streaming.api.functions.WindowMapFunction;
+import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
+import org.apache.flink.streaming.api.operators.StreamFilter;
+import org.apache.flink.streaming.api.operators.StreamFlatMap;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
+import org.apache.flink.streaming.api.operators.windowing.EmptyWindowFilter;
+import org.apache.flink.streaming.api.operators.windowing.ParallelGroupedMerge;
+import org.apache.flink.streaming.api.operators.windowing.ParallelMerge;
+import org.apache.flink.streaming.api.operators.windowing.WindowFlattener;
+import org.apache.flink.streaming.api.operators.windowing.WindowFolder;
+import org.apache.flink.streaming.api.operators.windowing.WindowMapper;
+import org.apache.flink.streaming.api.operators.windowing.WindowMerger;
+import org.apache.flink.streaming.api.operators.windowing.WindowPartExtractor;
+import org.apache.flink.streaming.api.operators.windowing.WindowPartitioner;
+import org.apache.flink.streaming.api.operators.windowing.WindowReducer;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
 import org.apache.flink.streaming.api.windowing.StreamWindowTypeInfo;
 import org.apache.flink.streaming.api.windowing.WindowUtils.WindowKey;
@@ -93,7 +93,7 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
 
 		// If we merged a non-grouped reduce transformation we need to reduce
 		// again
-		if (!isGrouped() && out.discretizedStream.invokable instanceof WindowMerger) {
+		if (!isGrouped() && out.discretizedStream.operator instanceof WindowMerger) {
 			return out.transform(WindowTransformation.REDUCEWINDOW, "Window Reduce", out.getType(),
 					new WindowReducer<OUT>(discretizedStream.clean(reduceFunction)));
 		} else {
@@ -147,7 +147,7 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
 				.addCoFunction(
 						"CoFlatMap",
 						reduced.discretizedStream.getType(),
-						new CoFlatMapInvokable<StreamWindow<OUT>, Tuple2<Integer, Integer>, StreamWindow<OUT>>(
+						new CoStreamFlatMap<StreamWindow<OUT>, Tuple2<Integer, Integer>, StreamWindow<OUT>>(
 								parallelMerger));
 	}
 
@@ -182,15 +182,15 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
 
 	private <R> DiscretizedStream<R> transform(WindowTransformation transformation,
 			String operatorName, TypeInformation<R> retType,
-			StreamInvokable<StreamWindow<OUT>, StreamWindow<R>> invokable) {
+			StreamOperator<StreamWindow<OUT>, StreamWindow<R>> operator) {
 
 		return wrap(discretizedStream.transform(operatorName, new StreamWindowTypeInfo<R>(retType),
-				invokable), transformation);
+				operator), transformation);
 	}
 
 	private DiscretizedStream<OUT> filterEmpty(DiscretizedStream<OUT> input) {
 		return wrap(input.discretizedStream.transform("Filter", input.discretizedStream.getType(),
-				new FilterInvokable<StreamWindow<OUT>>(new EmptyWindowFilter<OUT>())
+				new StreamFilter<StreamWindow<OUT>>(new EmptyWindowFilter<OUT>())
 						.withoutInputCopy()), input.isPartitioned);
 	}
 
@@ -198,7 +198,7 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
 	private DataStream<Tuple2<Integer, Integer>> extractPartsByID(DiscretizedStream<OUT> input) {
 		return input.discretizedStream.transform("ExtractParts", new TupleTypeInfo(Tuple2.class,
 				BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
-				new FlatMapInvokable<StreamWindow<OUT>, Tuple2<Integer, Integer>>(
+				new StreamFlatMap<StreamWindow<OUT>, Tuple2<Integer, Integer>>(
 						new WindowPartExtractor<OUT>()).withoutInputCopy());
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index 034425d..64d546a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -24,10 +24,10 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
-import org.apache.flink.streaming.api.invokable.operator.GroupedFoldInvokable;
-import org.apache.flink.streaming.api.invokable.operator.GroupedReduceInvokable;
-import org.apache.flink.streaming.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
+import org.apache.flink.streaming.api.operators.StreamGroupedFold;
+import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 
 /**
  * A GroupedDataStream represents a {@link DataStream} which has been
@@ -71,7 +71,7 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
 	 */
 	@Override
 	public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reducer) {
-		return transform("Grouped Reduce", getType(), new GroupedReduceInvokable<OUT>(
+		return transform("Grouped Reduce", getType(), new StreamGroupedReduce<OUT>(
 				clean(reducer), keySelector));
 	}
 
@@ -97,7 +97,7 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
 		TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(folder), getType(),
 				Utils.getCallLocationName(), false);
 
-		return transform("Grouped Fold", outType, new GroupedFoldInvokable<OUT, R>(clean(folder),
+		return transform("Grouped Fold", outType, new StreamGroupedFold<OUT, R>(clean(folder),
 				keySelector, initialValue, outType));
 	}
 
@@ -214,11 +214,11 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
 	@Override
 	protected SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate) {
 
-		GroupedReduceInvokable<OUT> invokable = new GroupedReduceInvokable<OUT>(clean(aggregate),
+		StreamGroupedReduce<OUT> operator = new StreamGroupedReduce<OUT>(clean(aggregate),
 				keySelector);
 
 		SingleOutputStreamOperator<OUT, ?> returnStream = transform("Grouped Aggregation",
-				getType(), invokable);
+				getType(), operator);
 
 		return returnStream;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
index 3d29569..c9db12d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
@@ -18,7 +18,7 @@
 package org.apache.flink.streaming.api.datastream;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.operators.StreamOperator;
 
 /**
  * The iterative data stream represents the start of an iteration in a
@@ -78,11 +78,11 @@ public class IterativeDataStream<IN> extends
 
 	@Override
 	public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName,
-			TypeInformation<R> outTypeInfo, StreamInvokable<IN, R> invokable) {
+			TypeInformation<R> outTypeInfo, StreamOperator<IN, R> operator) {
 
 		// We call the superclass tranform method
 		SingleOutputStreamOperator<R, ?> returnStream = super.transform(operatorName, outTypeInfo,
-				invokable);
+				operator);
 
 		// Then we add a source that will take care of receiving feedback tuples
 		// from the tail

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index cebebe0..aa70e3f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -22,8 +22,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.invokable.StreamInvokable.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperator.ChainingStrategy;
 
 /**
  * The SingleOutputStreamOperator represents a user defined transformation
@@ -38,13 +38,13 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 		DataStream<OUT> {
 
 	protected boolean isSplit;
-	protected StreamInvokable<?, ?> invokable;
+	protected StreamOperator<?, ?> operator;
 
 	protected SingleOutputStreamOperator(StreamExecutionEnvironment environment,
-			String operatorType, TypeInformation<OUT> outTypeInfo, StreamInvokable<?, ?> invokable) {
+			String operatorType, TypeInformation<OUT> outTypeInfo, StreamOperator<?, ?> operator) {
 		super(environment, operatorType, outTypeInfo);
 		this.isSplit = false;
-		this.invokable = invokable;
+		this.operator = operator;
 	}
 
 	@SuppressWarnings("unchecked")
@@ -52,7 +52,7 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 		super(dataStream);
 		if (dataStream instanceof SingleOutputStreamOperator) {
 			this.isSplit = ((SingleOutputStreamOperator<OUT, ?>) dataStream).isSplit;
-			this.invokable = ((SingleOutputStreamOperator<OUT, ?>) dataStream).invokable;
+			this.operator = ((SingleOutputStreamOperator<OUT, ?>) dataStream).operator;
 		}
 	}
 
@@ -119,7 +119,7 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 	}
 
 	public SingleOutputStreamOperator<OUT, O> setChainingStrategy(ChainingStrategy strategy) {
-		this.invokable.setChainingStrategy(strategy);
+		this.operator.setChainingStrategy(strategy);
 		return this;
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
index 915ca30..0243344 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
@@ -45,7 +45,7 @@ import org.apache.flink.api.java.tuple.Tuple7;
 import org.apache.flink.api.java.tuple.Tuple8;
 import org.apache.flink.api.java.tuple.Tuple9;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.streaming.api.invokable.operator.ProjectInvokable;
+import org.apache.flink.streaming.api.operators.StreamProject;
 
 public class StreamProjection<IN> {
 
@@ -83,7 +83,7 @@ public class StreamProjection<IN> {
 		@SuppressWarnings("unchecked")
 		TypeInformation<Tuple1<T0>> outType = (TypeInformation<Tuple1<T0>>) extractFieldTypes(
 				fieldIndexes, types, inTypeInfo);
-		return dataStream.transform("Projection", outType, new ProjectInvokable<IN, Tuple1<T0>>(
+		return dataStream.transform("Projection", outType, new StreamProject<IN, Tuple1<T0>>(
 				fieldIndexes, outType));
 	}
 
@@ -112,7 +112,7 @@ public class StreamProjection<IN> {
 		TypeInformation<Tuple2<T0, T1>> outType = (TypeInformation<Tuple2<T0, T1>>) extractFieldTypes(
 				fieldIndexes, types, inTypeInfo);
 		return dataStream.transform("Projection", outType,
-				new ProjectInvokable<IN, Tuple2<T0, T1>>(fieldIndexes, outType));
+				new StreamProject<IN, Tuple2<T0, T1>>(fieldIndexes, outType));
 	}
 
 	/**
@@ -142,7 +142,7 @@ public class StreamProjection<IN> {
 		TypeInformation<Tuple3<T0, T1, T2>> outType = (TypeInformation<Tuple3<T0, T1, T2>>) extractFieldTypes(
 				fieldIndexes, types, inTypeInfo);
 		return dataStream.transform("Projection", outType,
-				new ProjectInvokable<IN, Tuple3<T0, T1, T2>>(fieldIndexes, outType));
+				new StreamProject<IN, Tuple3<T0, T1, T2>>(fieldIndexes, outType));
 	}
 
 	/**
@@ -174,7 +174,7 @@ public class StreamProjection<IN> {
 		TypeInformation<Tuple4<T0, T1, T2, T3>> outType = (TypeInformation<Tuple4<T0, T1, T2, T3>>) extractFieldTypes(
 				fieldIndexes, types, inTypeInfo);
 		return dataStream.transform("Projection", outType,
-				new ProjectInvokable<IN, Tuple4<T0, T1, T2, T3>>(fieldIndexes, outType));
+				new StreamProject<IN, Tuple4<T0, T1, T2, T3>>(fieldIndexes, outType));
 	}
 
 	/**
@@ -207,7 +207,7 @@ public class StreamProjection<IN> {
 		TypeInformation<Tuple5<T0, T1, T2, T3, T4>> outType = (TypeInformation<Tuple5<T0, T1, T2, T3, T4>>) extractFieldTypes(
 				fieldIndexes, types, inTypeInfo);
 		return dataStream.transform("Projection", outType,
-				new ProjectInvokable<IN, Tuple5<T0, T1, T2, T3, T4>>(fieldIndexes, outType));
+				new StreamProject<IN, Tuple5<T0, T1, T2, T3, T4>>(fieldIndexes, outType));
 	}
 
 	/**
@@ -244,7 +244,7 @@ public class StreamProjection<IN> {
 		TypeInformation<Tuple6<T0, T1, T2, T3, T4, T5>> outType = (TypeInformation<Tuple6<T0, T1, T2, T3, T4, T5>>) extractFieldTypes(
 				fieldIndexes, types, inTypeInfo);
 		return dataStream.transform("Projection", outType,
-				new ProjectInvokable<IN, Tuple6<T0, T1, T2, T3, T4, T5>>(fieldIndexes, outType));
+				new StreamProject<IN, Tuple6<T0, T1, T2, T3, T4, T5>>(fieldIndexes, outType));
 	}
 
 	/**
@@ -284,7 +284,7 @@ public class StreamProjection<IN> {
 				fieldIndexes, types, inTypeInfo);
 		return dataStream
 				.transform("Projection", outType,
-						new ProjectInvokable<IN, Tuple7<T0, T1, T2, T3, T4, T5, T6>>(fieldIndexes,
+						new StreamProject<IN, Tuple7<T0, T1, T2, T3, T4, T5, T6>>(fieldIndexes,
 								outType));
 	}
 
@@ -326,7 +326,7 @@ public class StreamProjection<IN> {
 		TypeInformation<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> outType = (TypeInformation<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>) extractFieldTypes(
 				fieldIndexes, types, inTypeInfo);
 		return dataStream.transform("Projection", outType,
-				new ProjectInvokable<IN, Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(fieldIndexes,
+				new StreamProject<IN, Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(fieldIndexes,
 						outType));
 	}
 
@@ -370,7 +370,7 @@ public class StreamProjection<IN> {
 		TypeInformation<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> outType = (TypeInformation<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>) extractFieldTypes(
 				fieldIndexes, types, inTypeInfo);
 		return dataStream.transform("Projection", outType,
-				new ProjectInvokable<IN, Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(fieldIndexes,
+				new StreamProject<IN, Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(fieldIndexes,
 						outType));
 	}
 
@@ -416,7 +416,7 @@ public class StreamProjection<IN> {
 		TypeInformation<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> outType = (TypeInformation<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>) extractFieldTypes(
 				fieldIndexes, types, inTypeInfo);
 		return dataStream.transform("Projection", outType,
-				new ProjectInvokable<IN, Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(
+				new StreamProject<IN, Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(
 						fieldIndexes, outType));
 	}
 
@@ -466,7 +466,7 @@ public class StreamProjection<IN> {
 		TypeInformation<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> outType = (TypeInformation<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>) extractFieldTypes(
 				fieldIndexes, types, inTypeInfo);
 		return dataStream.transform("Projection", outType,
-				new ProjectInvokable<IN, Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(
+				new StreamProject<IN, Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(
 						fieldIndexes, outType));
 	}
 
@@ -521,7 +521,7 @@ public class StreamProjection<IN> {
 				.transform(
 						"Projection",
 						outType,
-						new ProjectInvokable<IN, Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(
+						new StreamProject<IN, Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(
 								fieldIndexes, outType));
 	}
 
@@ -578,7 +578,7 @@ public class StreamProjection<IN> {
 				.transform(
 						"Projection",
 						outType,
-						new ProjectInvokable<IN, Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(
+						new StreamProject<IN, Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(
 								fieldIndexes, outType));
 	}
 
@@ -637,7 +637,7 @@ public class StreamProjection<IN> {
 				.transform(
 						"Projection",
 						outType,
-						new ProjectInvokable<IN, Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(
+						new StreamProject<IN, Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(
 								fieldIndexes, outType));
 	}
 
@@ -699,7 +699,7 @@ public class StreamProjection<IN> {
 				.transform(
 						"Projection",
 						outType,
-						new ProjectInvokable<IN, Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(
+						new StreamProject<IN, Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(
 								fieldIndexes, outType));
 	}
 
@@ -763,7 +763,7 @@ public class StreamProjection<IN> {
 				.transform(
 						"Projection",
 						outType,
-						new ProjectInvokable<IN, Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(
+						new StreamProject<IN, Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(
 								fieldIndexes, outType));
 	}
 
@@ -829,7 +829,7 @@ public class StreamProjection<IN> {
 				.transform(
 						"Projection",
 						outType,
-						new ProjectInvokable<IN, Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(
+						new StreamProject<IN, Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(
 								fieldIndexes, outType));
 	}
 
@@ -897,7 +897,7 @@ public class StreamProjection<IN> {
 				.transform(
 						"Projection",
 						outType,
-						new ProjectInvokable<IN, Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(
+						new StreamProject<IN, Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(
 								fieldIndexes, outType));
 	}
 
@@ -968,7 +968,7 @@ public class StreamProjection<IN> {
 				.transform(
 						"Projection",
 						outType,
-						new ProjectInvokable<IN, Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(
+						new StreamProject<IN, Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(
 								fieldIndexes, outType));
 	}
 
@@ -1041,7 +1041,7 @@ public class StreamProjection<IN> {
 				.transform(
 						"Projection",
 						outType,
-						new ProjectInvokable<IN, Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(
+						new StreamProject<IN, Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(
 								fieldIndexes, outType));
 	}
 
@@ -1117,7 +1117,7 @@ public class StreamProjection<IN> {
 				.transform(
 						"Projection",
 						outType,
-						new ProjectInvokable<IN, Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(
+						new StreamProject<IN, Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(
 								fieldIndexes, outType));
 	}
 
@@ -1195,7 +1195,7 @@ public class StreamProjection<IN> {
 				.transform(
 						"Projection",
 						outType,
-						new ProjectInvokable<IN, Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(
+						new StreamProject<IN, Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(
 								fieldIndexes, outType));
 	}
 
@@ -1276,7 +1276,7 @@ public class StreamProjection<IN> {
 				.transform(
 						"Projection",
 						outType,
-						new ProjectInvokable<IN, Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(
+						new StreamProject<IN, Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(
 								fieldIndexes, outType));
 	}
 
@@ -1359,7 +1359,7 @@ public class StreamProjection<IN> {
 				.transform(
 						"Projection",
 						outType,
-						new ProjectInvokable<IN, Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(
+						new StreamProject<IN, Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(
 								fieldIndexes, outType));
 	}
 
@@ -1445,7 +1445,7 @@ public class StreamProjection<IN> {
 						"Projection",
 
 						outType,
-						new ProjectInvokable<IN, Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(
+						new StreamProject<IN, Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(
 								fieldIndexes, outType));
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 027f318..15e0179 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -31,18 +31,18 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.function.RichWindowMapFunction;
-import org.apache.flink.streaming.api.function.WindowMapFunction;
-import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType;
-import org.apache.flink.streaming.api.function.aggregation.ComparableAggregator;
-import org.apache.flink.streaming.api.function.aggregation.SumAggregator;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.invokable.operator.windowing.GroupedActiveDiscretizer;
-import org.apache.flink.streaming.api.invokable.operator.windowing.GroupedStreamDiscretizer;
-import org.apache.flink.streaming.api.invokable.operator.windowing.GroupedWindowBufferInvokable;
-import org.apache.flink.streaming.api.invokable.operator.windowing.StreamDiscretizer;
-import org.apache.flink.streaming.api.invokable.operator.windowing.WindowBufferInvokable;
+import org.apache.flink.streaming.api.functions.RichWindowMapFunction;
+import org.apache.flink.streaming.api.functions.WindowMapFunction;
+import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
+import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
+import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
+import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.windowing.GroupedActiveDiscretizer;
+import org.apache.flink.streaming.api.operators.windowing.GroupedStreamDiscretizer;
+import org.apache.flink.streaming.api.operators.windowing.GroupedWindowBuffer;
+import org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer;
+import org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
 import org.apache.flink.streaming.api.windowing.StreamWindowTypeInfo;
 import org.apache.flink.streaming.api.windowing.WindowEvent;
@@ -378,9 +378,9 @@ public class WindowedDataStream<OUT> {
 	private DiscretizedStream<OUT> discretize(WindowTransformation transformation,
 			WindowBuffer<OUT> windowBuffer) {
 
-		StreamInvokable<OUT, WindowEvent<OUT>> discretizer = getDiscretizer();
+		StreamOperator<OUT, WindowEvent<OUT>> discretizer = getDiscretizer();
 
-		StreamInvokable<WindowEvent<OUT>, StreamWindow<OUT>> bufferInvokable = getBufferInvokable(windowBuffer);
+		StreamOperator<WindowEvent<OUT>, StreamWindow<OUT>> bufferOperator = getBufferOperator(windowBuffer);
 
 		@SuppressWarnings({ "unchecked", "rawtypes" })
 		TypeInformation<WindowEvent<OUT>> bufferEventType = new TupleTypeInfo(WindowEvent.class,
@@ -392,7 +392,7 @@ public class WindowedDataStream<OUT> {
 				.transform(discretizer.getClass().getSimpleName(), bufferEventType, discretizer)
 				.setParallelism(parallelism)
 				.transform(windowBuffer.getClass().getSimpleName(),
-						new StreamWindowTypeInfo<OUT>(getType()), bufferInvokable)
+						new StreamWindowTypeInfo<OUT>(getType()), bufferOperator)
 				.setParallelism(parallelism), groupByKey, transformation, false);
 
 	}
@@ -452,7 +452,7 @@ public class WindowedDataStream<OUT> {
 	/**
 	 * Based on the defined policies, returns the stream discretizer to be used
 	 */
-	private StreamInvokable<OUT, WindowEvent<OUT>> getDiscretizer() {
+	private StreamOperator<OUT, WindowEvent<OUT>> getDiscretizer() {
 		if (discretizerKey == null) {
 			return new StreamDiscretizer<OUT>(getTrigger(), getEviction());
 		} else if (getTrigger() instanceof CentralActiveTrigger) {
@@ -467,12 +467,12 @@ public class WindowedDataStream<OUT> {
 
 	}
 
-	private StreamInvokable<WindowEvent<OUT>, StreamWindow<OUT>> getBufferInvokable(
+	private StreamOperator<WindowEvent<OUT>, StreamWindow<OUT>> getBufferOperator(
 			WindowBuffer<OUT> windowBuffer) {
 		if (discretizerKey == null) {
-			return new WindowBufferInvokable<OUT>(windowBuffer);
+			return new StreamWindowBuffer<OUT>(windowBuffer);
 		} else {
-			return new GroupedWindowBufferInvokable<OUT>(windowBuffer, discretizerKey);
+			return new GroupedWindowBuffer<OUT>(windowBuffer, discretizerKey);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamCrossOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamCrossOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamCrossOperator.java
new file mode 100644
index 0000000..dbd295f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamCrossOperator.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream.temporal;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.api.common.functions.CrossFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.operators.CrossOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.co.CrossWindowFunction;
+import org.apache.flink.streaming.api.operators.co.CoStreamWindow;
+
+public class StreamCrossOperator<I1, I2> extends
+		TemporalOperator<I1, I2, StreamCrossOperator.CrossWindow<I1, I2>> {
+
+	public StreamCrossOperator(DataStream<I1> input1, DataStream<I2> input2) {
+		super(input1, input2);
+	}
+
+	protected <F> F clean(F f) {
+		if (input1.getExecutionEnvironment().getConfig().isClosureCleanerEnabled()) {
+			ClosureCleaner.clean(f, true);
+		}
+		ClosureCleaner.ensureSerializable(f);
+		return f;
+	}
+
+	@Override
+	protected CrossWindow<I1, I2> createNextWindowOperator() {
+
+		CrossWindowFunction<I1, I2, Tuple2<I1, I2>> crossWindowFunction = new CrossWindowFunction<I1, I2, Tuple2<I1, I2>>(
+				clean(new CrossOperator.DefaultCrossFunction<I1, I2>()));
+
+		return new CrossWindow<I1, I2>(this, input1.connect(input2).addGeneralWindowCombine(
+				crossWindowFunction,
+				new TupleTypeInfo<Tuple2<I1, I2>>(input1.getType(), input2.getType()), windowSize,
+				slideInterval, timeStamp1, timeStamp2));
+	}
+
+	public static class CrossWindow<I1, I2> extends
+			SingleOutputStreamOperator<Tuple2<I1, I2>, CrossWindow<I1, I2>> implements
+			TemporalWindow<CrossWindow<I1, I2>> {
+
+		private StreamCrossOperator<I1, I2> op;
+
+		public CrossWindow(StreamCrossOperator<I1, I2> op, DataStream<Tuple2<I1, I2>> ds) {
+			super(ds);
+			this.op = op;
+		}
+
+		public CrossWindow<I1, I2> every(long length, TimeUnit timeUnit) {
+			return every(timeUnit.toMillis(length));
+		}
+
+		@SuppressWarnings("unchecked")
+		public CrossWindow<I1, I2> every(long length) {
+			((CoStreamWindow<I1, I2, ?>) streamGraph.getVertex(id).getOperator())
+					.setSlideSize(length);
+			return this;
+		}
+
+		/**
+		 * Finalizes a temporal Cross transformation by applying a
+		 * {@link CrossFunction} to each pair of crossed elements.<br/>
+		 * Each CrossFunction call returns exactly one element.
+		 * 
+		 * @param function
+		 *            The CrossFunction that is called for each pair of crossed
+		 *            elements.
+		 * @return The crossed data streams
+		 * 
+		 */
+		@SuppressWarnings("unchecked")
+		public <R> SingleOutputStreamOperator<R, ?> with(CrossFunction<I1, I2, R> function) {
+			TypeInformation<R> outTypeInfo = TypeExtractor.getCrossReturnTypes(function,
+					op.input1.getType(), op.input2.getType());
+
+			CoStreamWindow<I1, I2, R> operator = new CoStreamWindow<I1, I2, R>(
+					new CrossWindowFunction<I1, I2, R>(clean(function)), op.windowSize,
+					op.slideInterval, op.timeStamp1, op.timeStamp2);
+
+			streamGraph.setOperator(id, operator);
+
+			return ((SingleOutputStreamOperator<R, ?>) this).returns(outTypeInfo);
+
+		}
+
+	}
+
+}


Mime
View raw message