flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [3/5] incubator-flink git commit: [streaming] Temporal operator windowing syntax update
Date Tue, 06 Jan 2015 14:43:22 GMT
[streaming] Temporal operator windowing syntax update


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/b0a2e4a1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/b0a2e4a1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/b0a2e4a1

Branch: refs/heads/master
Commit: b0a2e4a14e0161e753ec3296d4ead8e46ac8e303
Parents: 92ceacd
Author: Gyula Fora <gyfora@apache.org>
Authored: Mon Jan 5 15:24:59 2015 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Tue Jan 6 15:09:04 2015 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/JobGraphBuilder.java    |   4 +
 .../streaming/api/datastream/DataStream.java    |   4 +-
 .../api/datastream/DataStreamSink.java          |   2 +-
 .../api/datastream/GroupedDataStream.java       |   2 +-
 .../api/datastream/IterativeDataStream.java     |   2 +-
 .../datastream/SingleOutputStreamOperator.java  |   2 +-
 .../api/datastream/StreamCrossOperator.java     |  86 ------
 .../api/datastream/StreamJoinOperator.java      | 257 ------------------
 .../api/datastream/TemporalOperator.java        | 102 -------
 .../temporaloperator/StreamCrossOperator.java   | 101 +++++++
 .../temporaloperator/StreamJoinOperator.java    | 272 +++++++++++++++++++
 .../temporaloperator/TemporalOperator.java      | 124 +++++++++
 .../temporaloperator/TemporalWindow.java        |  45 +++
 .../operator/co/CoWindowInvokable.java          |   4 +
 .../streaming/api/windowing/helper/Time.java    |  20 +-
 .../streaming/api/WindowCrossJoinTest.java      |  21 +-
 .../streaming/examples/join/WindowJoin.java     |   3 +-
 .../scala/examples/windowing/WindowJoin.scala   |   7 +-
 .../api/scala/StreamCrossOperator.scala         |  17 +-
 .../api/scala/StreamJoinOperator.scala          |  47 ++--
 .../streaming/api/scala/TemporalOperator.scala  |  43 +++
 21 files changed, 662 insertions(+), 503 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index c826274..441360c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -509,6 +509,10 @@ public class JobGraphBuilder {
 		invokableObjects.put(id, invokableObject);
 	}
 
+	public StreamInvokable<?, ?> getInvokable(String id) {
+		return invokableObjects.get(id);
+	}
+
 	public <OUT> void setOutType(String id, TypeInformation<OUT> outType) {
 		StreamRecordSerializer<OUT> serializer = new StreamRecordSerializer<OUT>(outType);
 		typeSerializersOut1.put(id, serializer);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 8d0020e..e969647 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -38,6 +38,8 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.JobGraphBuilder;
+import org.apache.flink.streaming.api.datastream.temporaloperator.StreamCrossOperator;
+import org.apache.flink.streaming.api.datastream.temporaloperator.StreamJoinOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
 import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType;
@@ -1180,7 +1182,7 @@ public class DataStream<OUT> {
 	 * 
 	 * @return The copy
 	 */
-	protected DataStream<OUT> copy() {
+	public DataStream<OUT> copy() {
 		return new DataStream<OUT>(this);
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
index 369c3eb..61fc557 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
@@ -38,7 +38,7 @@ public class DataStreamSink<IN> extends SingleOutputStreamOperator<IN, DataStrea
 	}
 
 	@Override
-	protected DataStreamSink<IN> copy() {
+	public DataStreamSink<IN> copy() {
 		throw new RuntimeException("Data stream sinks cannot be copied");
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index 160ef8d..4a5c0c2 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -199,7 +199,7 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
 	}
 
 	@Override
-	protected GroupedDataStream<OUT> copy() {
+	public GroupedDataStream<OUT> copy() {
 		return new GroupedDataStream<OUT>(this);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
index 78518c0..d7467d1 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
@@ -72,7 +72,7 @@ public class IterativeDataStream<IN> extends
 	}
 
 	@Override
-	protected IterativeDataStream<IN> copy() {
+	public IterativeDataStream<IN> copy() {
 		return new IterativeDataStream<IN>(this, iterationID, waitTime);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index c19517b..4b6edc0 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -186,7 +186,7 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 	}
 
 	@Override
-	protected SingleOutputStreamOperator<OUT, O> copy() {
+	public SingleOutputStreamOperator<OUT, O> copy() {
 		return new SingleOutputStreamOperator<OUT, O>(this);
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamCrossOperator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamCrossOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamCrossOperator.java
deleted file mode 100644
index 2dd7bc0..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamCrossOperator.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.api.common.functions.CrossFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.operators.CrossOperator;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.function.co.CrossWindowFunction;
-import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
-
-public class StreamCrossOperator<I1, I2> extends
-		TemporalOperator<I1, I2, StreamCrossOperator.CrossWindow<I1, I2>> {
-
-	public StreamCrossOperator(DataStream<I1> input1, DataStream<I2> input2) {
-		super(input1, input2);
-	}
-
-	@Override
-	protected CrossWindow<I1, I2> createNextWindowOperator() {
-
-		CrossWindowFunction<I1, I2, Tuple2<I1, I2>> crossWindowFunction = new CrossWindowFunction<I1, I2, Tuple2<I1, I2>>(
-				input1.clean(new CrossOperator.DefaultCrossFunction<I1, I2>()));
-
-		return new CrossWindow<I1, I2>(this, input1.connect(input2).addGeneralWindowCombine(
-				crossWindowFunction,
-				new TupleTypeInfo<Tuple2<I1, I2>>(input1.getType(), input2.getType()), windowSize,
-				slideInterval, timeStamp1, timeStamp2));
-	}
-
-	public static class CrossWindow<I1, I2> extends
-			SingleOutputStreamOperator<Tuple2<I1, I2>, CrossWindow<I1, I2>> {
-
-		private StreamCrossOperator<I1, I2> op;
-
-		public CrossWindow(StreamCrossOperator<I1, I2> op, DataStream<Tuple2<I1, I2>> ds) {
-			super(ds);
-			this.op = op;
-		}
-
-		/**
-		 * Finalizes a temporal Cross transformation by applying a
-		 * {@link CrossFunction} to each pair of crossed elements.<br/>
-		 * Each CrossFunction call returns exactly one element.
-		 * 
-		 * @param function
-		 *            The CrossFunction that is called for each pair of crossed
-		 *            elements.
-		 * @return The crossed data streams
-		 * 
-		 */
-		public <R> SingleOutputStreamOperator<R, ?> with(CrossFunction<I1, I2, R> function) {
-			TypeInformation<R> outTypeInfo = TypeExtractor.getCrossReturnTypes(function,
-					op.input1.getType(), op.input2.getType());
-
-			CoWindowInvokable<I1, I2, R> invokable = new CoWindowInvokable<I1, I2, R>(
-					new CrossWindowFunction<I1, I2, R>(clean(function)), op.windowSize,
-					op.slideInterval, op.timeStamp1, op.timeStamp2);
-
-			jobGraphBuilder.setInvokable(id, invokable);
-
-			return setType(outTypeInfo);
-
-		}
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java
deleted file mode 100644
index de15515..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.operators.Keys;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.function.co.JoinWindowFunction;
-import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
-import org.apache.flink.streaming.util.keys.KeySelectorUtil;
-
-public class StreamJoinOperator<I1, I2> extends
-		TemporalOperator<I1, I2, StreamJoinOperator.JoinWindow<I1, I2>> {
-
-	public StreamJoinOperator(DataStream<I1> input1, DataStream<I2> input2) {
-		super(input1, input2);
-	}
-
-	@Override
-	protected JoinWindow<I1, I2> createNextWindowOperator() {
-		return new JoinWindow<I1, I2>(this);
-	}
-
-	public static class JoinWindow<I1, I2> {
-
-		private StreamJoinOperator<I1, I2> op;
-		private TypeInformation<I1> type1;
-
-		private JoinWindow(StreamJoinOperator<I1, I2> operator) {
-			this.op = operator;
-			this.type1 = op.input1.getType();
-		}
-
-		/**
-		 * Continues a temporal Join transformation. <br/>
-		 * Defines the {@link Tuple} fields of the first join {@link DataStream}
-		 * that should be used as join keys.<br/>
-		 * <b>Note: Fields can only be selected as join keys on Tuple
-		 * DataStreams.</b><br/>
-		 * 
-		 * @param fields
-		 *            The indexes of the other Tuple fields of the first join
-		 *            DataStreams that should be used as keys.
-		 * @return An incomplete Join transformation. Call
-		 *         {@link JoinPredicate#equalTo} to continue the Join.
-		 */
-		public JoinPredicate<I1, I2> where(int... fields) {
-			return new JoinPredicate<I1, I2>(op, KeySelectorUtil.getSelectorForKeys(
-					new Keys.ExpressionKeys<I1>(fields, type1), type1));
-		}
-
-		/**
-		 * Continues a temporal join transformation. <br/>
-		 * Defines the fields of the first join {@link DataStream} that should
-		 * be used as grouping keys. Fields are the names of member fields of
-		 * the underlying type of the data stream.
-		 * 
-		 * @param fields
-		 *            The fields of the first join DataStream that should be
-		 *            used as keys.
-		 * @return An incomplete Join transformation. Call
-		 *         {@link JoinPredicate#equalTo} to continue the Join.
-		 */
-		public JoinPredicate<I1, I2> where(String... fields) {
-			return new JoinPredicate<I1, I2>(op, KeySelectorUtil.getSelectorForKeys(
-					new Keys.ExpressionKeys<I1>(fields, type1), type1));
-		}
-
-		/**
-		 * Continues a temporal Join transformation and defines a
-		 * {@link KeySelector} function for the first join {@link DataStream}
-		 * .</br> The KeySelector function is called for each element of the
-		 * first DataStream and extracts a single key value on which the
-		 * DataStream is joined. </br>
-		 * 
-		 * @param keySelector
-		 *            The KeySelector function which extracts the key values
-		 *            from the DataStream on which it is joined.
-		 * @return An incomplete Join transformation. Call
-		 *         {@link JoinPredicate#equalTo} to continue the Join.
-		 */
-		public <K> JoinPredicate<I1, I2> where(KeySelector<I1, K> keySelector) {
-			return new JoinPredicate<I1, I2>(op, keySelector);
-		}
-
-		// ----------------------------------------------------------------------------------------
-
-	}
-
-	/**
-	 * Intermediate step of a temporal Join transformation. <br/>
-	 * To continue the Join transformation, select the join key of the second
-	 * input {@link DataStream} by calling {@link JoinPredicate#equalTo}
-	 * 
-	 */
-	public static class JoinPredicate<I1, I2> {
-
-		private StreamJoinOperator<I1, I2> op;
-		private KeySelector<I1, ?> keys1;
-		private KeySelector<I2, ?> keys2;
-		private TypeInformation<I2> type2;
-
-		private JoinPredicate(StreamJoinOperator<I1, I2> operator, KeySelector<I1, ?> keys1) {
-			this.op = operator;
-			this.keys1 = keys1;
-			this.type2 = op.input2.getType();
-		}
-
-		/**
-		 * Creates a temporal Join transformation and defines the {@link Tuple}
-		 * fields of the second join {@link DataStream} that should be used as
-		 * join keys.<br/>
-		 * </p> The resulting operator wraps each pair of joining elements in a
-		 * Tuple2<I1,I2>(first, second). To use a different wrapping function
-		 * use {@link JoinedStream#with(JoinFunction)}
-		 * 
-		 * @param fields
-		 *            The indexes of the Tuple fields of the second join
-		 *            DataStream that should be used as keys.
-		 * @return A streaming join operator. Call {@link JoinedStream#with} to
-		 *         apply a custom wrapping
-		 */
-		public JoinedStream<I1, I2> equalTo(int... fields) {
-			keys2 = KeySelectorUtil.getSelectorForKeys(new Keys.ExpressionKeys<I2>(fields, type2),
-					type2);
-			return createJoinOperator();
-		}
-
-		/**
-		 * Creates a temporal Join transformation and defines the fields of the
-		 * second join {@link DataStream} that should be used as join keys. </p>
-		 * The resulting operator wraps each pair of joining elements in a
-		 * Tuple2<I1,I2>(first, second). To use a different wrapping function
-		 * use {@link JoinedStream#with(JoinFunction)}
-		 * 
-		 * @param fields
-		 *            The fields of the second join DataStream that should be
-		 *            used as keys.
-		 * @return A streaming join operator. Call {@link JoinedStream#with} to
-		 *         apply a custom wrapping
-		 */
-		public JoinedStream<I1, I2> equalTo(String... fields) {
-			this.keys2 = KeySelectorUtil.getSelectorForKeys(new Keys.ExpressionKeys<I2>(fields,
-					type2), type2);
-			return createJoinOperator();
-		}
-
-		/**
-		 * Creates a temporal Join transformation and defines a
-		 * {@link KeySelector} function for the second join {@link DataStream}
-		 * .</br> The KeySelector function is called for each element of the
-		 * second DataStream and extracts a single key value on which the
-		 * DataStream is joined. </p> The resulting operator wraps each pair of
-		 * joining elements in a Tuple2<I1,I2>(first, second). To use a
-		 * different wrapping function use
-		 * {@link JoinedStream#with(JoinFunction)}
-		 * 
-		 * 
-		 * @param keySelector
-		 *            The KeySelector function which extracts the key values
-		 *            from the second DataStream on which it is joined.
-		 * @return A streaming join operator. Call {@link JoinedStream#with} to
-		 *         apply a custom wrapping
-		 */
-		public <K> JoinedStream<I1, I2> equalTo(KeySelector<I2, K> keySelector) {
-			this.keys2 = keySelector;
-			return createJoinOperator();
-		}
-
-		private JoinedStream<I1, I2> createJoinOperator() {
-
-			JoinFunction<I1, I2, Tuple2<I1, I2>> joinFunction = new DefaultJoinFunction<I1, I2>();
-
-			JoinWindowFunction<I1, I2, Tuple2<I1, I2>> joinWindowFunction = getJoinWindowFunction(
-					joinFunction, this);
-
-			TypeInformation<Tuple2<I1, I2>> outType = new TupleTypeInfo<Tuple2<I1, I2>>(
-					op.input1.getType(), op.input2.getType());
-
-			return new JoinedStream<I1, I2>(this, op.input1
-					.groupBy(keys1)
-					.connect(op.input2.groupBy(keys2))
-					.addGeneralWindowCombine(joinWindowFunction, outType, op.windowSize,
-							op.slideInterval, op.timeStamp1, op.timeStamp2));
-		}
-	}
-
-	public static class JoinedStream<I1, I2> extends
-			SingleOutputStreamOperator<Tuple2<I1, I2>, JoinedStream<I1, I2>> {
-		private final JoinPredicate<I1, I2> predicate;
-
-		private JoinedStream(JoinPredicate<I1, I2> predicate, DataStream<Tuple2<I1, I2>> ds) {
-			super(ds);
-			this.predicate = predicate;
-		}
-
-		/**
-		 * Completes a stream join. </p> The resulting operator wraps each pair
-		 * of joining elements using the user defined {@link JoinFunction}
-		 * 
-		 * @return The joined data stream.
-		 */
-		public <OUT> SingleOutputStreamOperator<OUT, ?> with(JoinFunction<I1, I2, OUT> joinFunction) {
-
-			TypeInformation<OUT> outType = TypeExtractor.getJoinReturnTypes(joinFunction,
-					predicate.op.input1.getType(), predicate.op.input2.getType());
-
-			CoWindowInvokable<I1, I2, OUT> invokable = new CoWindowInvokable<I1, I2, OUT>(
-					getJoinWindowFunction(joinFunction, predicate), predicate.op.windowSize,
-					predicate.op.slideInterval, predicate.op.timeStamp1, predicate.op.timeStamp2);
-
-			jobGraphBuilder.setInvokable(id, invokable);
-
-			return setType(outType);
-		}
-	}
-
-	public static final class DefaultJoinFunction<I1, I2> implements
-			JoinFunction<I1, I2, Tuple2<I1, I2>> {
-
-		private static final long serialVersionUID = 1L;
-		private final Tuple2<I1, I2> outTuple = new Tuple2<I1, I2>();
-
-		@Override
-		public Tuple2<I1, I2> join(I1 first, I2 second) throws Exception {
-			outTuple.f0 = first;
-			outTuple.f1 = second;
-			return outTuple;
-		}
-	}
-
-	public static <I1, I2, OUT> JoinWindowFunction<I1, I2, OUT> getJoinWindowFunction(
-			JoinFunction<I1, I2, OUT> joinFunction, JoinPredicate<I1, I2> predicate) {
-		return new JoinWindowFunction<I1, I2, OUT>(predicate.keys1, predicate.keys2, joinFunction);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/TemporalOperator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/TemporalOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/TemporalOperator.java
deleted file mode 100644
index e5385f0..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/TemporalOperator.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.streaming.api.windowing.helper.SystemTimestamp;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-
-public abstract class TemporalOperator<I1, I2, OP> {
-
-	public final DataStream<I1> input1;
-	public final DataStream<I2> input2;
-
-	public long windowSize;
-	public long slideInterval;
-
-	public TimestampWrapper<I1> timeStamp1;
-	public TimestampWrapper<I2> timeStamp2;
-
-	public TemporalOperator(DataStream<I1> input1, DataStream<I2> input2) {
-		if (input1 == null || input2 == null) {
-			throw new NullPointerException();
-		}
-		this.input1 = input1.copy();
-		this.input2 = input2.copy();
-	}
-
-	/**
-	 * Continues a temporal transformation.<br/>
-	 * Defines the window size on which the two DataStreams will be transformed.
-	 * 
-	 * @param windowSize
-	 *            The size of the window in milliseconds.
-	 * @return An incomplete temporal transformation.
-	 */
-	public OP onWindow(long windowSize) {
-		return onWindow(windowSize, windowSize);
-	}
-
-	/**
-	 * Continues a temporal transformation.<br/>
-	 * Defines the window size on which the two DataStreams will be transformed.
-	 * 
-	 * @param windowSize
-	 *            The size of the window in milliseconds.
-	 * @param slideInterval
-	 *            The slide size of the window.
-	 * @return An incomplete temporal transformation.
-	 */
-	@SuppressWarnings("unchecked")
-	public OP onWindow(long windowSize, long slideInterval) {
-		return onWindow(windowSize, slideInterval,
-				(TimestampWrapper<I1>) SystemTimestamp.getWrapper(),
-				(TimestampWrapper<I2>) SystemTimestamp.getWrapper());
-	}
-
-	/**
-	 * Continues a temporal transformation.<br/>
-	 * Defines the window size on which the two DataStreams will be transformed.
-	 * 
-	 * @param windowSize
-	 *            The size of the window in milliseconds.
-	 * @param slideInterval
-	 *            The slide size of the window.
-	 * @param timeStamp1
-	 *            The timestamp used to extract time from the elements of the
-	 *            first data stream.
-	 * @param timeStamp2
-	 *            The timestamp used to extract time from the elements of the
-	 *            second data stream.
-	 * @return An incomplete temporal transformation.
-	 */
-	public OP onWindow(long windowSize, long slideInterval, TimestampWrapper<I1> timeStamp1,
-			TimestampWrapper<I2> timeStamp2) {
-
-		this.windowSize = windowSize;
-		this.slideInterval = slideInterval;
-
-		this.timeStamp1 = timeStamp1;
-		this.timeStamp2 = timeStamp2;
-
-		return createNextWindowOperator();
-	}
-
-	protected abstract OP createNextWindowOperator();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java
new file mode 100644
index 0000000..8422400
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream.temporaloperator;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.api.common.functions.CrossFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.operators.CrossOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.function.co.CrossWindowFunction;
+import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
+
+public class StreamCrossOperator<I1, I2> extends
+		TemporalOperator<I1, I2, StreamCrossOperator.CrossWindow<I1, I2>> {
+	
+	public StreamCrossOperator(DataStream<I1> input1, DataStream<I2> input2) {
+		super(input1, input2);
+	}
+
+	@Override
+	protected CrossWindow<I1, I2> createNextWindowOperator() {
+
+		CrossWindowFunction<I1, I2, Tuple2<I1, I2>> crossWindowFunction = new CrossWindowFunction<I1, I2, Tuple2<I1, I2>>(
+				input1.clean(new CrossOperator.DefaultCrossFunction<I1, I2>()));
+
+		return new CrossWindow<I1, I2>(this, input1.connect(input2).addGeneralWindowCombine(
+				crossWindowFunction,
+				new TupleTypeInfo<Tuple2<I1, I2>>(input1.getType(), input2.getType()), windowSize,
+				slideInterval, timeStamp1, timeStamp2));
+	}
+
+	public static class CrossWindow<I1, I2> extends
+			SingleOutputStreamOperator<Tuple2<I1, I2>, CrossWindow<I1, I2>> implements
+			TemporalWindow<CrossWindow<I1, I2>> {
+
+		private StreamCrossOperator<I1, I2> op;
+
+		public CrossWindow(StreamCrossOperator<I1, I2> op, DataStream<Tuple2<I1, I2>> ds) {
+			super(ds);
+			this.op = op;
+		}
+
+		public CrossWindow<I1, I2> every(long length, TimeUnit timeUnit) {
+			return every(timeUnit.toMillis(length));
+		}
+
+		@SuppressWarnings("unchecked")
+		public CrossWindow<I1, I2> every(long length) {
+			((CoWindowInvokable<I1, I2, ?>) jobGraphBuilder.getInvokable(id)).setSlideSize(length);
+			return this;
+		}
+
+		/**
+		 * Finalizes a temporal Cross transformation by applying a
+		 * {@link CrossFunction} to each pair of crossed elements.<br/>
+		 * Each CrossFunction call returns exactly one element.
+		 * 
+		 * @param function
+		 *            The CrossFunction that is called for each pair of crossed
+		 *            elements.
+		 * @return The crossed data streams
+		 * 
+		 */
+		public <R> SingleOutputStreamOperator<R, ?> with(CrossFunction<I1, I2, R> function) {
+			TypeInformation<R> outTypeInfo = TypeExtractor.getCrossReturnTypes(function,
+					op.input1.getType(), op.input2.getType());
+
+			CoWindowInvokable<I1, I2, R> invokable = new CoWindowInvokable<I1, I2, R>(
+					new CrossWindowFunction<I1, I2, R>(clean(function)), op.windowSize,
+					op.slideInterval, op.timeStamp1, op.timeStamp2);
+
+			jobGraphBuilder.setInvokable(id, invokable);
+
+			return setType(outTypeInfo);
+
+		}
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java
new file mode 100644
index 0000000..626b9f1
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream.temporaloperator;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.function.co.JoinWindowFunction;
+import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+
+public class StreamJoinOperator<I1, I2> extends
+		TemporalOperator<I1, I2, StreamJoinOperator.JoinWindow<I1, I2>> {
+
+	public StreamJoinOperator(DataStream<I1> input1, DataStream<I2> input2) {
+		super(input1, input2);
+	}
+
+	@Override
+	protected JoinWindow<I1, I2> createNextWindowOperator() {
+		return new JoinWindow<I1, I2>(this);
+	}
+
+	public static class JoinWindow<I1, I2> implements TemporalWindow<JoinWindow<I1, I2>> {
+
+		private StreamJoinOperator<I1, I2> op;
+		private TypeInformation<I1> type1;
+
+		private JoinWindow(StreamJoinOperator<I1, I2> operator) {
+			this.op = operator;
+			this.type1 = op.input1.getType();
+		}
+
+		/**
+		 * Continues a temporal Join transformation. <br/>
+		 * Defines the {@link Tuple} fields of the first join {@link DataStream}
+		 * that should be used as join keys.<br/>
+		 * <b>Note: Fields can only be selected as join keys on Tuple
+		 * DataStreams.</b><br/>
+		 * 
+		 * @param fields
+		 *            The indexes of the other Tuple fields of the first join
+		 *            DataStreams that should be used as keys.
+		 * @return An incomplete Join transformation. Call
+		 *         {@link JoinPredicate#equalTo} to continue the Join.
+		 */
+		public JoinPredicate<I1, I2> where(int... fields) {
+			return new JoinPredicate<I1, I2>(op, KeySelectorUtil.getSelectorForKeys(
+					new Keys.ExpressionKeys<I1>(fields, type1), type1));
+		}
+
+		/**
+		 * Continues a temporal join transformation. <br/>
+		 * Defines the fields of the first join {@link DataStream} that should
+		 * be used as grouping keys. Fields are the names of member fields of
+		 * the underlying type of the data stream.
+		 * 
+		 * @param fields
+		 *            The fields of the first join DataStream that should be
+		 *            used as keys.
+		 * @return An incomplete Join transformation. Call
+		 *         {@link JoinPredicate#equalTo} to continue the Join.
+		 */
+		public JoinPredicate<I1, I2> where(String... fields) {
+			return new JoinPredicate<I1, I2>(op, KeySelectorUtil.getSelectorForKeys(
+					new Keys.ExpressionKeys<I1>(fields, type1), type1));
+		}
+
+		/**
+		 * Continues a temporal Join transformation and defines a
+		 * {@link KeySelector} function for the first join {@link DataStream}
+		 * .</br> The KeySelector function is called for each element of the
+		 * first DataStream and extracts a single key value on which the
+		 * DataStream is joined. </br>
+		 * 
+		 * @param keySelector
+		 *            The KeySelector function which extracts the key values
+		 *            from the DataStream on which it is joined.
+		 * @return An incomplete Join transformation. Call
+		 *         {@link JoinPredicate#equalTo} to continue the Join.
+		 */
+		public <K> JoinPredicate<I1, I2> where(KeySelector<I1, K> keySelector) {
+			return new JoinPredicate<I1, I2>(op, keySelector);
+		}
+
+		@Override
+		public JoinWindow<I1, I2> every(long length, TimeUnit timeUnit) {
+			return every(timeUnit.toMillis(length));
+		}
+
+		@Override
+		public JoinWindow<I1, I2> every(long length) {
+			op.slideInterval = length;
+			return this;
+		}
+
+		// ----------------------------------------------------------------------------------------
+
+	}
+
+	/**
+	 * Intermediate step of a temporal Join transformation. <br/>
+	 * To continue the Join transformation, select the join key of the second
+	 * input {@link DataStream} by calling {@link JoinPredicate#equalTo}
+	 * 
+	 */
+	public static class JoinPredicate<I1, I2> {
+
+		private StreamJoinOperator<I1, I2> op;
+		private KeySelector<I1, ?> keys1;
+		private KeySelector<I2, ?> keys2;
+		private TypeInformation<I2> type2;
+
+		private JoinPredicate(StreamJoinOperator<I1, I2> operator, KeySelector<I1, ?> keys1) {
+			this.op = operator;
+			this.keys1 = keys1;
+			this.type2 = op.input2.getType();
+		}
+
+		/**
+		 * Creates a temporal Join transformation and defines the {@link Tuple}
+		 * fields of the second join {@link DataStream} that should be used as
+		 * join keys.<br/>
+		 * </p> The resulting operator wraps each pair of joining elements in a
+		 * Tuple2<I1,I2>(first, second). To use a different wrapping function
+		 * use {@link JoinedStream#with(JoinFunction)}
+		 * 
+		 * @param fields
+		 *            The indexes of the Tuple fields of the second join
+		 *            DataStream that should be used as keys.
+		 * @return A streaming join operator. Call {@link JoinedStream#with} to
+		 *         apply a custom wrapping
+		 */
+		public JoinedStream<I1, I2> equalTo(int... fields) {
+			keys2 = KeySelectorUtil.getSelectorForKeys(new Keys.ExpressionKeys<I2>(fields, type2),
+					type2);
+			return createJoinOperator();
+		}
+
+		/**
+		 * Creates a temporal Join transformation and defines the fields of the
+		 * second join {@link DataStream} that should be used as join keys. </p>
+		 * The resulting operator wraps each pair of joining elements in a
+		 * Tuple2<I1,I2>(first, second). To use a different wrapping function
+		 * use {@link JoinedStream#with(JoinFunction)}
+		 * 
+		 * @param fields
+		 *            The fields of the second join DataStream that should be
+		 *            used as keys.
+		 * @return A streaming join operator. Call {@link JoinedStream#with} to
+		 *         apply a custom wrapping
+		 */
+		public JoinedStream<I1, I2> equalTo(String... fields) {
+			this.keys2 = KeySelectorUtil.getSelectorForKeys(new Keys.ExpressionKeys<I2>(fields,
+					type2), type2);
+			return createJoinOperator();
+		}
+
+		/**
+		 * Creates a temporal Join transformation and defines a
+		 * {@link KeySelector} function for the second join {@link DataStream}
+		 * .</br> The KeySelector function is called for each element of the
+		 * second DataStream and extracts a single key value on which the
+		 * DataStream is joined. </p> The resulting operator wraps each pair of
+		 * joining elements in a Tuple2<I1,I2>(first, second). To use a
+		 * different wrapping function use
+		 * {@link JoinedStream#with(JoinFunction)}
+		 * 
+		 * 
+		 * @param keySelector
+		 *            The KeySelector function which extracts the key values
+		 *            from the second DataStream on which it is joined.
+		 * @return A streaming join operator. Call {@link JoinedStream#with} to
+		 *         apply a custom wrapping
+		 */
+		public <K> JoinedStream<I1, I2> equalTo(KeySelector<I2, K> keySelector) {
+			this.keys2 = keySelector;
+			return createJoinOperator();
+		}
+
+		private JoinedStream<I1, I2> createJoinOperator() {
+
+			JoinFunction<I1, I2, Tuple2<I1, I2>> joinFunction = new DefaultJoinFunction<I1, I2>();
+
+			JoinWindowFunction<I1, I2, Tuple2<I1, I2>> joinWindowFunction = getJoinWindowFunction(
+					joinFunction, this);
+
+			TypeInformation<Tuple2<I1, I2>> outType = new TupleTypeInfo<Tuple2<I1, I2>>(
+					op.input1.getType(), op.input2.getType());
+
+			return new JoinedStream<I1, I2>(this, op.input1
+					.groupBy(keys1)
+					.connect(op.input2.groupBy(keys2))
+					.addGeneralWindowCombine(joinWindowFunction, outType, op.windowSize,
+							op.slideInterval, op.timeStamp1, op.timeStamp2));
+		}
+	}
+
+	public static class JoinedStream<I1, I2> extends
+			SingleOutputStreamOperator<Tuple2<I1, I2>, JoinedStream<I1, I2>> {
+		private final JoinPredicate<I1, I2> predicate;
+
+		private JoinedStream(JoinPredicate<I1, I2> predicate, DataStream<Tuple2<I1, I2>> ds) {
+			super(ds);
+			this.predicate = predicate;
+		}
+
+		/**
+		 * Completes a stream join. </p> The resulting operator wraps each pair
+		 * of joining elements using the user defined {@link JoinFunction}
+		 * 
+		 * @return The joined data stream.
+		 */
+		public <OUT> SingleOutputStreamOperator<OUT, ?> with(JoinFunction<I1, I2, OUT> joinFunction) {
+
+			TypeInformation<OUT> outType = TypeExtractor.getJoinReturnTypes(joinFunction,
+					predicate.op.input1.getType(), predicate.op.input2.getType());
+
+			CoWindowInvokable<I1, I2, OUT> invokable = new CoWindowInvokable<I1, I2, OUT>(
+					getJoinWindowFunction(joinFunction, predicate), predicate.op.windowSize,
+					predicate.op.slideInterval, predicate.op.timeStamp1, predicate.op.timeStamp2);
+
+			jobGraphBuilder.setInvokable(id, invokable);
+
+			return setType(outType);
+		}
+	}
+
+	public static final class DefaultJoinFunction<I1, I2> implements
+			JoinFunction<I1, I2, Tuple2<I1, I2>> {
+
+		private static final long serialVersionUID = 1L;
+		private final Tuple2<I1, I2> outTuple = new Tuple2<I1, I2>();
+
+		@Override
+		public Tuple2<I1, I2> join(I1 first, I2 second) throws Exception {
+			outTuple.f0 = first;
+			outTuple.f1 = second;
+			return outTuple;
+		}
+	}
+
+	public static <I1, I2, OUT> JoinWindowFunction<I1, I2, OUT> getJoinWindowFunction(
+			JoinFunction<I1, I2, OUT> joinFunction, JoinPredicate<I1, I2> predicate) {
+		return new JoinWindowFunction<I1, I2, OUT>(predicate.keys1, predicate.keys2, joinFunction);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/TemporalOperator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/TemporalOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/TemporalOperator.java
new file mode 100644
index 0000000..f121dfa
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/TemporalOperator.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream.temporaloperator;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.windowing.helper.SystemTimestamp;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
+
+public abstract class TemporalOperator<I1, I2, OP extends TemporalWindow<OP>> {
+
+	public final DataStream<I1> input1;
+	public final DataStream<I2> input2;
+
+	public long windowSize;
+	public long slideInterval;
+
+	public TimestampWrapper<I1> timeStamp1;
+	public TimestampWrapper<I2> timeStamp2;
+
+	public TemporalOperator(DataStream<I1> input1, DataStream<I2> input2) {
+		if (input1 == null || input2 == null) {
+			throw new NullPointerException();
+		}
+		this.input1 = input1.copy();
+		this.input2 = input2.copy();
+	}
+
+	/**
+	 * Continues a temporal transformation.<br/>
+	 * Defines the window size on which the two DataStreams will be transformed.
+	 * To define sliding windows call {@link TemporalWindow#every} on the
+	 * resulting operator.
+	 * 
+	 * @param length
+	 *            The size of the window in milliseconds.
+	 * @param timeUnit
+	 *            The unit if time to be used
+	 * @return An incomplete temporal transformation.
+	 */
+	@SuppressWarnings("unchecked")
+	public OP onWindow(long length, TimeUnit timeUnit) {
+		return onWindow(timeUnit.toMillis(length),
+				(TimestampWrapper<I1>) SystemTimestamp.getWrapper(),
+				(TimestampWrapper<I2>) SystemTimestamp.getWrapper());
+	}
+
+	/**
+	 * Continues a temporal transformation.<br/>
+	 * Defines the window size on which the two DataStreams will be
+	 * transformed.To define sliding windows call {@link TemporalWindow#every}
+	 * on the resulting operator.
+	 * 
+	 * @param windowSize
+	 *            The size of the window in milliseconds.
+	 * @param timeStamp1
+	 *            The timestamp used to extract time from the elements of the
+	 *            first data stream.
+	 * @param timeStamp2
+	 *            The timestamp used to extract time from the elements of the
+	 *            second data stream.
+	 * @return An incomplete temporal transformation.
+	 */
+	public OP onWindow(long length, Timestamp<I1> timeStamp1, Timestamp<I2> timeStamp2) {
+		return onWindow(length, timeStamp1, timeStamp2, 0);
+	}
+
+	/**
+	 * Continues a temporal transformation.<br/>
+	 * Defines the window size on which the two DataStreams will be
+	 * transformed.To define sliding windows call {@link TemporalWindow#every}
+	 * on the resulting operator.
+	 * 
+	 * @param windowSize
+	 *            The size of the window in milliseconds.
+	 * @param timeStamp1
+	 *            The timestamp used to extract time from the elements of the
+	 *            first data stream.
+	 * @param timeStamp2
+	 *            The timestamp used to extract time from the elements of the
+	 *            second data stream.
+	 * @param startTime
+	 *            The start time to measure the first window
+	 * @return An incomplete temporal transformation.
+	 */
+	public OP onWindow(long length, Timestamp<I1> timeStamp1, Timestamp<I2> timeStamp2,
+			long startTime) {
+		return onWindow(length, new TimestampWrapper<I1>(timeStamp1, startTime),
+				new TimestampWrapper<I2>(timeStamp2, startTime));
+	}
+
+	private OP onWindow(long length, TimestampWrapper<I1> timeStamp1,
+			TimestampWrapper<I2> timeStamp2) {
+
+		this.windowSize = length;
+		this.slideInterval = length;
+
+		this.timeStamp1 = timeStamp1;
+		this.timeStamp2 = timeStamp2;
+
+		return createNextWindowOperator();
+	}
+
+	protected abstract OP createNextWindowOperator();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/TemporalWindow.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/TemporalWindow.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/TemporalWindow.java
new file mode 100644
index 0000000..8ac1492
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/TemporalWindow.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream.temporaloperator;
+
+import java.util.concurrent.TimeUnit;
+
+public interface TemporalWindow<T> {
+
+	/**
+	 * Defines the slide interval for this temporal operator
+	 * 
+	 * @param length
+	 *            Length of the window
+	 * @param timeUnit
+	 *            Unit of time
+	 * @return The temporal operator with slide interval specified
+	 */
+	public T every(long length, TimeUnit timeUnit);
+
+	/**
+	 * Defines the slide interval for this temporal operator
+	 * 
+	 * @param length
+	 *            Length of the window
+	 * @return The temporal operator with slide interval specified
+	 */
+	public T every(long length);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
index 59552f4..03219b7 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
@@ -190,4 +190,8 @@ public class CoWindowInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT>
 	protected void callUserFunction2() throws Exception {
 	}
 
+	public void setSlideSize(long slideSize) {
+		this.slideSize = slideSize;
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
index 9dc1c8c..f94eea4 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
@@ -35,13 +35,13 @@ import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
  */
 public class Time<DATA> implements WindowingHelper<DATA> {
 
-	private long length;
-	private TimeUnit granularity;
-	private TimestampWrapper<DATA> timestampWrapper;
-	private long delay;
+	protected long length;
+	protected TimeUnit granularity;
+	protected TimestampWrapper<DATA> timestampWrapper;
+	protected long delay;
 
 	/**
-	 * Creates an helper representing a trigger which triggers every given
+	 * Creates a helper representing a trigger which triggers every given
 	 * length or an eviction which evicts all elements older than length.
 	 * 
 	 * @param length
@@ -62,7 +62,7 @@ public class Time<DATA> implements WindowingHelper<DATA> {
 	}
 
 	/**
-	 * Creates an helper representing a trigger which triggers every given
+	 * Creates a helper representing a trigger which triggers every given
 	 * length or an eviction which evicts all elements older than length.
 	 * 
 	 * @param length
@@ -160,11 +160,7 @@ public class Time<DATA> implements WindowingHelper<DATA> {
 		return this;
 	}
 
-	private long granularityInMillis() {
-		if (granularity != null) {
-			return this.granularity.toMillis(this.length);
-		} else {
-			return this.length;
-		}
+	protected long granularityInMillis() {
+		return granularity == null ? length : granularity.toMillis(length);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
index e856f07..3da6b5f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
@@ -28,7 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.junit.Test;
 
@@ -98,14 +98,14 @@ public class WindowCrossJoinTest implements Serializable {
 
 		inStream1
 				.join(inStream2)
-				.onWindow(1000, 1000, new MyTimestamp<Tuple2<Integer, String>>(),
-						new MyTimestamp<Tuple1<Integer>>()).where(0).equalTo(0)
+				.onWindow(1000, new MyTimestamp<Tuple2<Integer, String>>(),
+						new MyTimestamp<Tuple1<Integer>>(), 100).where(0).equalTo(0)
 				.addSink(new JoinResultSink());
 
 		inStream1
 				.cross(inStream2)
-				.onWindow(1000, 1000, new MyTimestamp<Tuple2<Integer, String>>(),
-						new MyTimestamp<Tuple1<Integer>>())
+				.onWindow(1000, new MyTimestamp<Tuple2<Integer, String>>(),
+						new MyTimestamp<Tuple1<Integer>>(), 100)
 				.with(new CrossFunction<Tuple2<Integer, String>, Tuple1<Integer>, Tuple2<Tuple2<Integer, String>, Tuple1<Integer>>>() {
 
 					private static final long serialVersionUID = 1L;
@@ -123,22 +123,13 @@ public class WindowCrossJoinTest implements Serializable {
 		assertEquals(crossExpectedResults, crossResults);
 	}
 
-	private static class MyTimestamp<T> extends TimestampWrapper<T> {
-		public MyTimestamp() {
-			super(null, 0);
-		}
-
+	private static class MyTimestamp<T> implements Timestamp<T> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
 		public long getTimestamp(T value) {
 			return 101L;
 		}
-
-		@Override
-		public long getStartTime() {
-			return 100L;
-		}
 	}
 
 	private static class JoinResultSink implements

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
index 897ad48..dcfed50 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.examples.join;
 
 import java.util.Random;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -67,7 +68,7 @@ public class WindowJoin {
 		// second windows
 		DataStream<Tuple3<String, Integer, Integer>> joinedStream = grades
 						.join(salaries)
-						.onWindow(1000)
+						.onWindow(1, TimeUnit.SECONDS)
 						.where(0)
 						.equalTo(0)
 						.with(new MyJoinFunction());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala
index a19e4b4..e87d4a1 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala
@@ -22,6 +22,7 @@ import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.util.Collector
 import scala.util.Random
+import java.util.concurrent.TimeUnit
 
 object WindowJoin {
 
@@ -37,10 +38,10 @@ object WindowJoin {
     val names = env.addSource(nameStream _).map(x => Name(x._1, x._2))
     val ages = env.addSource(ageStream _).map(x => Age(x._1, x._2))
 
-    //Join the two input streams by id on the last second and create new Person objects
-    //containing both name and age
+    //Join the two input streams by id on the last second every 2 seconds and create new 
+    //Person objects containing both name and age
     val joined =
-      names.join(ages).onWindow(1000)
+      names.join(ages).onWindow(1, TimeUnit.SECONDS).every(2, TimeUnit.SECONDS)
                       .where("id").equalTo("id") { (n, a) => Person(n.name, a.age) }
 
     joined print

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
index e26db62..d620d5e 100644
--- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
+++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.api.scala
 
 import scala.reflect.ClassTag
-
 import org.apache.commons.lang.Validate
 import org.apache.flink.api.common.functions.CrossFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -28,11 +27,12 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.typeutils.CaseClassSerializer
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream}
-import org.apache.flink.streaming.api.datastream.TemporalOperator
 import org.apache.flink.streaming.api.function.co.CrossWindowFunction
 import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
 import org.apache.flink.streaming.api.scala.StreamingConversions._
+import org.apache.flink.streaming.api.datastream.temporaloperator.TemporalWindow
+import java.util.concurrent.TimeUnit
 
 class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends
   TemporalOperator[I1, I2, StreamCrossOperator.CrossWindow[I1, I2]](i1, i2) {
@@ -72,7 +72,7 @@ object StreamCrossOperator {
 
   private[flink] class CrossWindow[I1, I2](op: StreamCrossOperator[I1, I2],
                                            javaStream: JavaStream[(I1, I2)]) extends
-    DataStream[(I1, I2)](javaStream) {
+    DataStream[(I1, I2)](javaStream) with TemporalWindow[CrossWindow[I1, I2]] {
 
     /**
      * Sets a wrapper for the crossed elements. For each crossed pair, the result of the udf
@@ -90,6 +90,17 @@ object StreamCrossOperator {
 
       javaStream.setType(implicitly[TypeInformation[R]])
     }
+    
+    override def every(length: Long, timeUnit: TimeUnit): CrossWindow[I1, I2] = {
+      every(timeUnit.toMillis(length))
+    }
+
+    override def every(length: Long): CrossWindow[I1, I2] = {
+      val builder = javaStream.getExecutionEnvironment().getJobGraphBuilder()
+      val invokable = builder.getInvokable(javaStream.getId())
+      invokable.asInstanceOf[CoWindowInvokable[_,_,_]].setSlideSize(length)
+      this
+    }
   }
 
   private[flink] def getCrossWindowFunction[I1, I2, R](op: StreamCrossOperator[I1, I2],

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
index d587d56..cb79e2a 100644
--- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
+++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
@@ -20,25 +20,24 @@ package org.apache.flink.streaming.api.scala
 
 import scala.Array.canBuildFrom
 import scala.reflect.ClassTag
-
 import org.apache.commons.lang.Validate
 import org.apache.flink.api.common.functions.JoinFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.api.java.operators.Keys
-import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.typeutils.CaseClassSerializer
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream}
-import org.apache.flink.streaming.api.datastream.TemporalOperator
+import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
 import org.apache.flink.streaming.api.function.co.JoinWindowFunction
 import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
-import org.apache.flink.streaming.api.scala.StreamingConversions._
+import org.apache.flink.streaming.api.scala.StreamingConversions.javaToScalaStream
 import org.apache.flink.streaming.util.keys.KeySelectorUtil
+import org.apache.flink.streaming.api.datastream.temporaloperator.TemporalWindow
+import java.util.concurrent.TimeUnit
 
-class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends
+class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends 
 TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1, I2]](i1, i2) {
 
   override def createNextWindowOperator() = {
@@ -48,10 +47,11 @@ TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1, I2]](i1, i2) {
 
 object StreamJoinOperator {
 
-  class JoinWindow[I1, I2](private[flink] op: StreamJoinOperator[I1, I2]) {
+  class JoinWindow[I1, I2](private[flink]op: StreamJoinOperator[I1, I2]) extends 
+  TemporalWindow[JoinWindow[I1, I2]] {
 
     private[flink] val type1 = op.input1.getType();
-    
+
     /**
      * Continues a temporal Join transformation by defining
      * the fields in the first stream to be used as keys for the join.
@@ -60,7 +60,7 @@ object StreamJoinOperator {
      */
     def where(fields: Int*) = {
       new JoinPredicate[I1, I2](op, KeySelectorUtil.getSelectorForKeys(
-          new Keys.ExpressionKeys(fields.toArray,type1),type1))
+        new Keys.ExpressionKeys(fields.toArray, type1), type1))
     }
 
     /**
@@ -69,9 +69,9 @@ object StreamJoinOperator {
      * The resulting incomplete join can be completed by JoinPredicate.equalTo()
      * to define the second key.
      */
-    def where(firstField: String, otherFields: String*) = 
+    def where(firstField: String, otherFields: String*) =
       new JoinPredicate[I1, I2](op, KeySelectorUtil.getSelectorForKeys(
-          new Keys.ExpressionKeys(firstField +: otherFields.toArray,type1),type1))  
+        new Keys.ExpressionKeys(firstField +: otherFields.toArray, type1), type1))
 
     /**
      * Continues a temporal Join transformation by defining
@@ -89,10 +89,19 @@ object StreamJoinOperator {
       new JoinPredicate[I1, I2](op, keyExtractor)
     }
 
+    override def every(length: Long, timeUnit: TimeUnit): JoinWindow[I1, I2] = {
+      every(timeUnit.toMillis(length))
+    }
+
+    override def every(length: Long): JoinWindow[I1, I2] = {
+      op.slideInterval = length
+      this
+    }
+
   }
 
   class JoinPredicate[I1, I2](private[flink] val op: StreamJoinOperator[I1, I2],
-                              private[flink] val keys1: KeySelector[I1, _]) {
+    private[flink] val keys1: KeySelector[I1, _]) {
     private[flink] var keys2: KeySelector[I2, _] = null
     private[flink] val type2 = op.input2.getType();
 
@@ -104,7 +113,7 @@ object StreamJoinOperator {
      */
     def equalTo(fields: Int*): JoinedStream[I1, I2] = {
       finish(KeySelectorUtil.getSelectorForKeys(
-          new Keys.ExpressionKeys(fields.toArray,type2),type2))
+        new Keys.ExpressionKeys(fields.toArray, type2), type2))
     }
 
     /**
@@ -113,9 +122,9 @@ object StreamJoinOperator {
      * (first, second)
      * To define a custom wrapping, use JoinedStream.apply(...)
      */
-    def equalTo(firstField: String, otherFields: String*): JoinedStream[I1, I2] = 
-     finish(KeySelectorUtil.getSelectorForKeys(
-          new Keys.ExpressionKeys(firstField +: otherFields.toArray,type2),type2))
+    def equalTo(firstField: String, otherFields: String*): JoinedStream[I1, I2] =
+      finish(KeySelectorUtil.getSelectorForKeys(
+        new Keys.ExpressionKeys(firstField +: otherFields.toArray, type2), type2))
 
     /**
      * Creates a temporal join transformation by defining the second join key.
@@ -159,11 +168,11 @@ object StreamJoinOperator {
 
       return op.input1.groupBy(keys1).connect(op.input2.groupBy(keys2))
         .addGeneralWindowCombine(getJoinWindowFunction(this, (_, _)),
-        returnType, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2)
+          returnType, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2)
     }
   }
 
-  class JoinedStream[I1, I2](jp: JoinPredicate[I1, I2], javaStream: JavaStream[(I1, I2)]) extends
+  class JoinedStream[I1, I2](jp: JoinPredicate[I1, I2], javaStream: JavaStream[(I1, I2)]) extends 
   DataStream[(I1, I2)](javaStream) {
 
     private val op = jp.op
@@ -186,7 +195,7 @@ object StreamJoinOperator {
   }
 
   private[flink] def getJoinWindowFunction[I1, I2, R](jp: JoinPredicate[I1, I2],
-                                                      joinFunction: (I1, I2) => R) = {
+    joinFunction: (I1, I2) => R) = {
     Validate.notNull(joinFunction, "Join function must not be null.")
 
     val joinFun = new JoinFunction[I1, I2, R] {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala
new file mode 100644
index 0000000..fd3a4a9
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.streaming.api.datastream.temporaloperator.{ TemporalOperator => JTempOp }
+import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
+import org.apache.flink.streaming.api.datastream.temporaloperator.TemporalWindow
+import org.apache.flink.streaming.api.windowing.helper.Timestamp
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment._
+
+abstract class TemporalOperator[I1, I2, OP <: TemporalWindow[OP]](
+  i1: JavaStream[I1], i2: JavaStream[I2]) extends JTempOp[I1, I2, OP](i1, i2) {
+
+  def onWindow(length: Long, ts1: I1 => Long, ts2: I2 => Long, startTime: Long = 0): OP = {
+    val timeStamp1 = getTS(ts1)
+    val timeStamp2 = getTS(ts2)
+    onWindow(length, timeStamp1, timeStamp2, startTime)
+  }
+
+  def getTS[R](ts: R => Long): Timestamp[R] = {
+    new Timestamp[R] {
+      val cleanFun = clean(ts, true)
+      def getTimestamp(in: R) = cleanFun(in)
+    }
+  }
+
+}


Mime
View raw message