flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [06/27] incubator-flink git commit: [scala] [streaming] Temporal join operator added
Date Sun, 04 Jan 2015 20:50:56 GMT
[scala] [streaming] Temporal join operator added


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/555837cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/555837cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/555837cd

Branch: refs/heads/master
Commit: 555837cd6da98b0311a7f752a3b2523f6efbf6a1
Parents: 1c87d8b
Author: Gyula Fora <gyfora@apache.org>
Authored: Sat Dec 20 18:12:19 2014 +0100
Committer: Gyula Fora <gyfora@apache.org>
Committed: Fri Jan 2 18:34:38 2015 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/JobGraphBuilder.java    |   9 +
 .../api/datastream/ConnectedDataStream.java     |   4 +-
 .../streaming/api/datastream/DataStream.java    |  15 +-
 .../api/datastream/GroupedDataStream.java       |   2 +-
 .../datastream/SingleOutputStreamOperator.java  |  10 +-
 .../api/datastream/StreamJoinOperator.java      | 118 +++++++-----
 .../api/datastream/StreamProjection.java        |   2 +-
 .../streaming/util/keys/FieldsKeySelector.java  |  15 +-
 .../streaming/api/WindowCrossJoinTest.java      |   2 +-
 .../streaming/examples/join/WindowJoin.java     |  36 +++-
 .../flink/api/scala/streaming/DataStream.scala  |   6 +-
 .../api/scala/streaming/FieldsKeySelector.scala |  29 +++
 .../scala/streaming/StreamJoinOperator.scala    | 188 +++++++++++++++++++
 .../scala/streaming/WindowedDataStream.scala    |   2 +-
 14 files changed, 359 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/555837cd/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index f358de9..c826274 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -505,6 +505,15 @@ public class JobGraphBuilder {
 
 	}
 
+	public <IN, OUT> void setInvokable(String id, StreamInvokable<IN, OUT> invokableObject)
{
+		invokableObjects.put(id, invokableObject);
+	}
+
+	public <OUT> void setOutType(String id, TypeInformation<OUT> outType) {
+		StreamRecordSerializer<OUT> serializer = new StreamRecordSerializer<OUT>(outType);
+		typeSerializersOut1.put(id, serializer);
+	}
+
 	/**
 	 * Sets TypeSerializerWrapper from one vertex to another, used with some
 	 * sinks.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/555837cd/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index 65a6c37..39b6460 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -539,7 +539,7 @@ public class ConnectedDataStream<IN1, IN2> {
 		return invokable;
 	}
 
-	protected <OUT> SingleOutputStreamOperator<OUT, ?> addGeneralWindowCombine(
+	public <OUT> SingleOutputStreamOperator<OUT, ?> addGeneralWindowCombine(
 			CoWindowFunction<IN1, IN2, OUT> coWindowFunction, TypeInformation<OUT> outTypeInfo,
 			long windowSize, long slideInterval, TimeStamp<IN1> timestamp1,
 			TimeStamp<IN2> timestamp2) {
@@ -550,7 +550,7 @@ public class ConnectedDataStream<IN1, IN2> {
 		if (slideInterval < 1) {
 			throw new IllegalArgumentException("Slide interval must be positive");
 		}
-
+		
 		return addCoFunction("coWindowReduce", outTypeInfo, new CoWindowInvokable<IN1, IN2,
OUT>(
 				clean(coWindowFunction), windowSize, slideInterval, timestamp1, timestamp2));
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/555837cd/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index a236312..2a0b673 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -92,7 +92,8 @@ public class DataStream<OUT> {
 	protected List<String> userDefinedNames;
 	protected boolean selectAll;
 	protected StreamPartitioner<OUT> partitioner;
-	protected final TypeInformation<OUT> typeInfo;
+	@SuppressWarnings("rawtypes")
+	protected TypeInformation typeInfo;
 	protected List<DataStream<OUT>> mergedStreams;
 
 	protected final JobGraphBuilder jobGraphBuilder;
@@ -175,10 +176,18 @@ public class DataStream<OUT> {
 	 * 
 	 * @return The type of the datastream.
 	 */
+	@SuppressWarnings("unchecked")
 	public TypeInformation<OUT> getType() {
 		return this.typeInfo;
 	}
 
+	@SuppressWarnings("unchecked")
+	public <R> DataStream<R> setType(TypeInformation<R> outType) {
+		jobGraphBuilder.setOutType(id, outType);
+		typeInfo = outType;
+		return (DataStream<R>) this;
+	}
+
 	public <F> F clean(F f) {
 		if (getExecutionEnvironment().getConfig().isClosureCleanerEnabled()) {
 			ClosureCleaner.clean(f, true);
@@ -979,7 +988,7 @@ public class DataStream<OUT> {
 
 		StreamReduceInvokable<OUT> invokable = new StreamReduceInvokable<OUT>(aggregate);
 
-		SingleOutputStreamOperator<OUT, ?> returnStream = transform("reduce", typeInfo, invokable);
+		SingleOutputStreamOperator<OUT, ?> returnStream = transform("reduce", getType(),
invokable);
 
 		return returnStream;
 	}
@@ -1077,7 +1086,7 @@ public class DataStream<OUT> {
 	 */
 	public DataStreamSink<OUT> addSink(SinkFunction<OUT> sinkFunction) {
 
-		DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink",
typeInfo);
+		DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink",
getType());
 
 		jobGraphBuilder.addStreamVertex(returnStream.getId(), new SinkInvokable<OUT>(
 				clean(sinkFunction)), getType(), null, "sink", degreeOfParallelism);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/555837cd/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index 18b4b75..160ef8d 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -186,7 +186,7 @@ public class GroupedDataStream<OUT> extends DataStream<OUT>
{
 		GroupedReduceInvokable<OUT> invokable = new GroupedReduceInvokable<OUT>(clean(aggregate),
 				keySelector);
 
-		SingleOutputStreamOperator<OUT, ?> returnStream = transform("groupReduce", typeInfo,
+		SingleOutputStreamOperator<OUT, ?> returnStream = transform("groupReduce", getType(),
 				invokable);
 
 		return returnStream;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/555837cd/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 016322b..c19517b 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -33,7 +33,7 @@ import org.apache.flink.streaming.state.OperatorState;
 /**
  * The SingleOutputStreamOperator represents a user defined transformation
  * applied on a {@link DataStream} with one predefined output type.
- *
+ * 
  * @param <OUT>
  *            Output type of the operator.
  * @param <O>
@@ -52,6 +52,13 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 		super(dataStream);
 	}
 
+	@SuppressWarnings("unchecked")
+	public <R> SingleOutputStreamOperator<R, ?> setType(TypeInformation<R>
outType) {
+		jobGraphBuilder.setOutType(id, outType);
+		typeInfo = outType;
+		return (SingleOutputStreamOperator<R, ?>) this;
+	}
+
 	/**
 	 * Sets the degree of parallelism for this operator. The degree must be 1 or
 	 * more.
@@ -71,7 +78,6 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 		return this;
 	}
 
-
 	/**
 	 * Sets the maximum time frequency (ms) for the flushing of the output
 	 * buffer. By default the output buffers flush only when they are full.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/555837cd/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java
index 3051587..cbcc1b4 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java
@@ -23,8 +23,10 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.function.co.JoinWindowFunction;
+import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
 import org.apache.flink.streaming.util.keys.FieldsKeySelector;
 import org.apache.flink.streaming.util.keys.PojoKeySelector;
 
@@ -122,114 +124,128 @@ public class StreamJoinOperator<I1, I2> extends
 		}
 
 		/**
-		 * Continues a temporal Join transformation and defines the
-		 * {@link Tuple} fields of the second join {@link DataStream} that
-		 * should be used as join keys.<br/>
-		 * <b>Note: Fields can only be selected as join keys on Tuple
-		 * DataStreams.</b><br/>
+		 * Creates a temporal Join transformation and defines the {@link Tuple}
+		 * fields of the second join {@link DataStream} that should be used as
+		 * join keys.<br/>
+		 * </p> The resulting operator wraps each pair of joining elements in a
+		 * Tuple2<I1,I2>(first, second). To use a different wrapping function
+		 * use {@link JoinedStream#with(JoinFunction)}
 		 * 
 		 * @param fields
 		 *            The indexes of the Tuple fields of the second join
 		 *            DataStream that should be used as keys.
-		 * @return An incomplete join. Call {@link FinalizeStreamJoin#with} or
-		 *         {@link FinalizeStreamJoin#withDefault} to complete
+		 * @return A streaming join operator. Call {@link JoinedStream#with} to
+		 *         apply a custom wrapping
 		 */
-		public FinalizeStreamJoin<I1, I2> equalTo(int... fields) {
+		public JoinedStream<I1, I2> equalTo(int... fields) {
 			keys2 = FieldsKeySelector.getSelector(op.input2.getType(), fields);
-			return new FinalizeStreamJoin<I1, I2>(this);
+			return createJoinOperator();
 		}
 
 		/**
-		 * Continues a temporal Join transformation and defines the fields of
-		 * the second join {@link DataStream} that should be used as join keys.<br/>
+		 * Creates a temporal Join transformation and defines the fields of the
+		 * second join {@link DataStream} that should be used as join keys. </p>
+		 * The resulting operator wraps each pair of joining elements in a
+		 * Tuple2<I1,I2>(first, second). To use a different wrapping function
+		 * use {@link JoinedStream#with(JoinFunction)}
 		 * 
 		 * @param fields
 		 *            The fields of the second join DataStream that should be
 		 *            used as keys.
-		 * @return An incomplete join. Call {@link FinalizeStreamJoin#with} or
-		 *         {@link FinalizeStreamJoin#withDefault} to complete
+		 * @return A streaming join operator. Call {@link JoinedStream#with} to
+		 *         apply a custom wrapping
 		 */
-		public FinalizeStreamJoin<I1, I2> equalTo(String... fields) {
+		public JoinedStream<I1, I2> equalTo(String... fields) {
 			this.keys2 = new PojoKeySelector<I2>(op.input2.getType(), fields);
-			return new FinalizeStreamJoin<I1, I2>(this);
+			return createJoinOperator();
 		}
 
 		/**
-		 * Continues a temporal Join transformation and defines a
+		 * Creates a temporal Join transformation and defines a
 		 * {@link KeySelector} function for the second join {@link DataStream}
 		 * .</br> The KeySelector function is called for each element of the
 		 * second DataStream and extracts a single key value on which the
-		 * DataStream is joined. </br>
+		 * DataStream is joined. </p> The resulting operator wraps each pair of
+		 * joining elements in a Tuple2<I1,I2>(first, second). To use a
+		 * different wrapping function use
+		 * {@link JoinedStream#with(JoinFunction)}
+		 * 
 		 * 
 		 * @param keySelector
 		 *            The KeySelector function which extracts the key values
 		 *            from the second DataStream on which it is joined.
-		 * @return An incomplete join. Call {@link FinalizeStreamJoin#with} or
-		 *         {@link FinalizeStreamJoin#withDefault} to complete
+		 * @return A streaming join operator. Call {@link JoinedStream#with} to
+		 *         apply a custom wrapping
 		 */
-		public <K> FinalizeStreamJoin<I1, I2> equalTo(KeySelector<I2, K> keySelector)
{
+		public <K> JoinedStream<I1, I2> equalTo(KeySelector<I2, K> keySelector)
{
 			this.keys2 = keySelector;
-			return new FinalizeStreamJoin<I1, I2>(this);
+			return createJoinOperator();
 		}
 
+		private JoinedStream<I1, I2> createJoinOperator() {
+
+			JoinFunction<I1, I2, Tuple2<I1, I2>> joinFunction = new DefaultJoinFunction<I1,
I2>();
+
+			JoinWindowFunction<I1, I2, Tuple2<I1, I2>> joinWindowFunction = getJoinWindowFunction(
+					joinFunction, this);
+
+			TypeInformation<Tuple2<I1, I2>> outType = new TupleTypeInfo<Tuple2<I1,
I2>>(
+					op.input1.getType(), op.input2.getType());
+
+			return new JoinedStream<I1, I2>(this, op.input1
+					.groupBy(keys1)
+					.connect(op.input2.groupBy(keys2))
+					.addGeneralWindowCombine(joinWindowFunction, outType, op.windowSize,
+							op.slideInterval, op.timeStamp1, op.timeStamp2));
+		}
 	}
 
-	public static class FinalizeStreamJoin<I1, I2> {
+	public static class JoinedStream<I1, I2> extends
+			SingleOutputStreamOperator<Tuple2<I1, I2>, JoinedStream<I1, I2>> {
 		private final JoinPredicate<I1, I2> predicate;
 
-		private FinalizeStreamJoin(JoinPredicate<I1, I2> predicate) {
+		private JoinedStream(JoinPredicate<I1, I2> predicate, DataStream<Tuple2<I1,
I2>> ds) {
+			super(ds);
 			this.predicate = predicate;
 		}
 
 		/**
 		 * Completes a stream join. </p> The resulting operator wraps each pair
-		 * of joining elements into a {@link Tuple2}, with the element of the
-		 * first input being the first field of the tuple and the element of the
-		 * second input being the second field of the tuple.
-		 * 
-		 * @return The joined data stream.
-		 */
-		public SingleOutputStreamOperator<Tuple2<I1, I2>, ?> withDefault() {
-			return createJoinOperator(new DefaultJoinFunction<I1, I2>());
-		}
-
-		/**
-		 * Completes a stream join. </p> The resulting operator wraps each pair
 		 * of joining elements using the user defined {@link JoinFunction}
 		 * 
 		 * @return The joined data stream.
 		 */
 		public <OUT> SingleOutputStreamOperator<OUT, ?> with(JoinFunction<I1, I2,
OUT> joinFunction) {
-			return createJoinOperator(joinFunction);
-		}
 
-		private <OUT> SingleOutputStreamOperator<OUT, ?> createJoinOperator(
-				JoinFunction<I1, I2, OUT> joinFunction) {
-
-			JoinWindowFunction<I1, I2, OUT> joinWindowFunction = new JoinWindowFunction<I1,
I2, OUT>(
-					predicate.keys1, predicate.keys2, joinFunction);
+			TypeInformation<OUT> outType = TypeExtractor.getJoinReturnTypes(joinFunction,
+					predicate.op.input1.getType(), predicate.op.input2.getType());
 
-			StreamJoinOperator<I1, I2> op = predicate.op;
+			CoWindowInvokable<I1, I2, OUT> invokable = new CoWindowInvokable<I1, I2, OUT>(
+					getJoinWindowFunction(joinFunction, predicate), predicate.op.windowSize,
+					predicate.op.slideInterval, predicate.op.timeStamp1, predicate.op.timeStamp2);
 
-			TypeInformation<OUT> outType = TypeExtractor.getJoinReturnTypes(joinFunction,
-					op.input1.getType(), op.input2.getType());
+			jobGraphBuilder.setInvokable(id, invokable);
 
-			return op.input1.connect(op.input2).addGeneralWindowCombine(joinWindowFunction,
-					outType, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2);
+			return setType(outType);
 		}
 	}
 
-	public static final class DefaultJoinFunction<T1, T2> implements
-			JoinFunction<T1, T2, Tuple2<T1, T2>> {
+	public static final class DefaultJoinFunction<I1, I2> implements
+			JoinFunction<I1, I2, Tuple2<I1, I2>> {
 
 		private static final long serialVersionUID = 1L;
-		private final Tuple2<T1, T2> outTuple = new Tuple2<T1, T2>();
+		private final Tuple2<I1, I2> outTuple = new Tuple2<I1, I2>();
 
 		@Override
-		public Tuple2<T1, T2> join(T1 first, T2 second) throws Exception {
+		public Tuple2<I1, I2> join(I1 first, I2 second) throws Exception {
 			outTuple.f0 = first;
 			outTuple.f1 = second;
 			return outTuple;
 		}
 	}
+
+	public static <I1, I2, OUT> JoinWindowFunction<I1, I2, OUT> getJoinWindowFunction(
+			JoinFunction<I1, I2, OUT> joinFunction, JoinPredicate<I1, I2> predicate) {
+		return new JoinWindowFunction<I1, I2, OUT>(predicate.keys1, predicate.keys2, joinFunction);
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/555837cd/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
index e71b18c..c8ad533 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
@@ -56,7 +56,7 @@ public class StreamProjection<IN> {
 	protected StreamProjection(DataStream<IN> dataStream, int[] fieldIndexes) {
 		this.dataStream = dataStream;
 		this.fieldIndexes = fieldIndexes;
-		this.inTypeInfo = dataStream.typeInfo;
+		this.inTypeInfo = dataStream.getType();
 		if (!inTypeInfo.isTupleType()) {
 			throw new RuntimeException("Only Tuple DataStreams can be projected");
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/555837cd/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/FieldsKeySelector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/FieldsKeySelector.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/FieldsKeySelector.java
index d785109..171ddc9 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/FieldsKeySelector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/FieldsKeySelector.java
@@ -56,12 +56,13 @@ public abstract class FieldsKeySelector<IN> implements KeySelector<IN,
Object> {
 	protected Object key;
 	protected boolean simpleKey;
 
-	public static Class<?>[] tupleClasses = new Class[] { Tuple1.class, Tuple2.class,
Tuple3.class,
-			Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class, Tuple9.class,
-			Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class, Tuple14.class,
-			Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class, Tuple19.class,
-			Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class, Tuple24.class,
-			Tuple25.class };
+	@SuppressWarnings("unchecked")
+	public static Class<? extends Tuple>[] tupleClasses = new Class[] { Tuple1.class,
Tuple2.class,
+			Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class,
+			Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class,
+			Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class,
+			Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class,
+			Tuple24.class, Tuple25.class };
 
 	public FieldsKeySelector(int... fields) {
 		this.keyFields = fields;
@@ -73,7 +74,7 @@ public abstract class FieldsKeySelector<IN> implements KeySelector<IN,
Object> {
 		}
 
 		try {
-			key = (Tuple) tupleClasses[fields.length - 1].newInstance();
+			key =  tupleClasses[fields.length - 1].newInstance();
 		} catch (Exception e) {
 			throw new RuntimeException(e.getMessage());
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/555837cd/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
index 07d40ff..37f8c0a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
@@ -96,7 +96,7 @@ public class WindowCrossJoinTest implements Serializable {
 		DataStream<Integer> inStream2 = env.fromCollection(in2);
 
 		inStream1.join(inStream2).onWindow(1000, 1000, new MyTimestamp1(), new MyTimestamp2())
-				.where(0).equalTo(0).withDefault().addSink(new JoinResultSink());
+				.where(0).equalTo(0).addSink(new JoinResultSink());
 
 		inStream1.cross(inStream2).onWindow(1000, 1000, new MyTimestamp1(), new MyTimestamp2())
 				.with(new CrossFunction<Tuple2<Integer,String>, Integer, Tuple2<Tuple2<Integer,String>,
Integer>>() {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/555837cd/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
index 2586e3c..897ad48 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
@@ -19,7 +19,9 @@ package org.apache.flink.streaming.examples.join;
 
 import java.util.Random;
 
+import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -34,7 +36,7 @@ import org.apache.flink.util.Collector;
  * his example will join two streams with a sliding window. One which emits
  * grades and one which emits salaries of people.
  * </p>
- *
+ * 
  * <p>
  * This example shows how to:
  * <ul>
@@ -63,13 +65,13 @@ public class WindowJoin {
 
 		// apply a temporal join over the two stream based on the names over one
 		// second windows
-		DataStream<Tuple2<Tuple2<String, Integer>, Tuple2<String, Integer>>>
joinedStream = grades
-				.join(salaries)
-				.onWindow(1000)
-				.where(0)
-				.equalTo(0)
-				.withDefault();
-
+		DataStream<Tuple3<String, Integer, Integer>> joinedStream = grades
+						.join(salaries)
+						.onWindow(1000)
+						.where(0)
+						.equalTo(0)
+						.with(new MyJoinFunction());
+		
 		// emit result
 		if (fileOutput) {
 			joinedStream.writeAsText(outputPath, 1);
@@ -141,6 +143,24 @@ public class WindowJoin {
 		}
 	}
 
+	public static class MyJoinFunction
+			implements
+			JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String,
Integer, Integer>> {
+
+		private static final long serialVersionUID = 1L;
+
+		private Tuple3<String, Integer, Integer> joined = new Tuple3<String, Integer,
Integer>();
+
+		@Override
+		public Tuple3<String, Integer, Integer> join(Tuple2<String, Integer> first,
+				Tuple2<String, Integer> second) throws Exception {
+			joined.f0 = first.f0;
+			joined.f1 = first.f1;
+			joined.f2 = second.f1;
+			return joined;
+		}
+	}
+
 	// *************************************************************************
 	// UTIL METHODS
 	// *************************************************************************

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/555837cd/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
index a117412..871fede 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
@@ -108,7 +108,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    *
    */
   def groupBy(fields: Int*): DataStream[T] =
-    new DataStream[T](javaStream.groupBy(fields: _*))
+    new DataStream[T](javaStream.groupBy(new FieldsKeySelector[T](fields: _*)))
 
   /**
    * Groups the elements of a DataStream by the given field expressions to
@@ -138,7 +138,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    *
    */
   def partitionBy(fields: Int*): DataStream[T] =
-    new DataStream[T](javaStream.partitionBy(fields: _*))
+    new DataStream[T](javaStream.partitionBy(new FieldsKeySelector[T](fields: _*)))
 
   /**
    * Sets the partitioning of the DataStream so that the output is
@@ -458,6 +458,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
     split(selector)
   }
 
+  def join[R](stream: DataStream[R]): StreamJoinOperator[T, R] = new StreamJoinOperator[T,
R](javaStream, stream.getJavaStream)
+
   /**
    * Writes a DataStream to the standard output stream (stdout). For each
    * element of the DataStream the result of .toString is

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/555837cd/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/FieldsKeySelector.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/FieldsKeySelector.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/FieldsKeySelector.scala
new file mode 100644
index 0000000..4223512
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/FieldsKeySelector.scala
@@ -0,0 +1,29 @@
+package org.apache.flink.api.scala.streaming
+
+import org.apache.flink.streaming.util.keys.{ FieldsKeySelector => JavaSelector }
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.Tuple
+
+class FieldsKeySelector[IN](fields: Int*) extends KeySelector[IN, Tuple] {
+
+  val t: Tuple = JavaSelector.tupleClasses(fields.length - 1).newInstance()
+
+  override def getKey(value: IN): Tuple =
+
+    value match {
+      case prod: Product => {
+        for (i <- 0 to fields.length - 1) {
+          t.setField(prod.productElement(fields(i)), i)
+        }
+        t
+      }
+      case tuple: Tuple => {
+        for (i <- 0 to fields.length - 1) {
+          t.setField(tuple.getField(fields(i)), i)
+        }
+        t
+      }
+      case _ => throw new RuntimeException("Only tuple types are supported")
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/555837cd/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
new file mode 100644
index 0000000..93950a2
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.streaming
+
+import org.apache.flink.api.common.functions.JoinFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.scala.ClosureCleaner
+import org.apache.flink.api.scala.typeutils.CaseClassSerializer
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
+import org.apache.flink.streaming.api.datastream.TemporalOperator
+import org.apache.flink.streaming.api.function.co.JoinWindowFunction
+import org.apache.flink.streaming.util.keys.PojoKeySelector
+import scala.reflect.ClassTag
+import org.apache.commons.lang.Validate
+import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable
+
+class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends TemporalOperator[I1,
I2, StreamJoinOperator.JoinWindow[I1, I2]](i1, i2) {
+
+  override def createNextWindowOperator() = {
+    new StreamJoinOperator.JoinWindow[I1, I2](this)
+  }
+}
+
+object StreamJoinOperator {
+
+  private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F =
{
+    ClosureCleaner.clean(f, checkSerializable)
+    f
+  }
+
+  class JoinWindow[I1, I2](op: StreamJoinOperator[I1, I2]) {
+
+    /**
+     * Continues a temporal Join transformation by defining
+     * the fields in the first stream to be used as keys for the join.
+     * The resulting incomplete join can be completed by JoinPredicate.equalTo()
+     * to define the second key.
+     */
+    def where(fields: Int*) = {
+      new JoinPredicate[I1, I2](op, new FieldsKeySelector[I1](fields: _*))
+    }
+
+    /**
+     * Continues a temporal Join transformation by defining
+     * the fields in the first stream to be used as keys for the join.
+     * The resulting incomplete join can be completed by JoinPredicate.equalTo()
+     * to define the second key.
+     */
+    def where(firstField: String, otherFields: String*) = {
+      new JoinPredicate[I1, I2](op, new PojoKeySelector[I1](op.input1.getType(), (firstField
+: otherFields): _*))
+    }
+
+    /**
+     * Continues a temporal Join transformation by defining
+     * the keyselector function that will be used to extract keys from the first stream
+     * for the join.
+     * The resulting incomplete join can be completed by JoinPredicate.equalTo()
+     * to define the second key.
+     */
+    def where[K: TypeInformation](fun: (I1) => K) = {
+      val keyType = implicitly[TypeInformation[K]]
+      val keyExtractor = new KeySelector[I1, K] {
+        val cleanFun = op.input1.clean(fun)
+        def getKey(in: I1) = cleanFun(in)
+      }
+      new JoinPredicate[I1, I2](op, keyExtractor)
+    }
+
+  }
+
+  class JoinPredicate[I1, I2](private[flink] val op: StreamJoinOperator[I1, I2], private[flink]
val keys1: KeySelector[I1, _]) {
+    private[flink] var keys2: KeySelector[I2, _] = null
+
+    /**
+     * Creates a temporal join transformation by defining the second join key.
+     * The returned transformation wrapes each joined element pair in a tuple2:
+     * (first, second)
+     * To define a custom wrapping, use JoinedStream.with(...)
+     */
+    def equalTo(fields: Int*): JoinedStream[I1, I2] = {
+      finish(new FieldsKeySelector[I2](fields: _*))
+    }
+
+    /**
+     * Creates a temporal join transformation by defining the second join key.
+     * The returned transformation wrapes each joined element pair in a tuple2:
+     * (first, second)
+     * To define a custom wrapping, use JoinedStream.with(...)
+     */
+    def equalTo(firstField: String, otherFields: String*): JoinedStream[I1, I2] = {
+      finish(new PojoKeySelector[I2](op.input2.getType(), (firstField +: otherFields): _*))
+    }
+
+    /**
+     * Creates a temporal join transformation by defining the second join key.
+     * The returned transformation wrapes each joined element pair in a tuple2:
+     * (first, second)
+     * To define a custom wrapping, use JoinedStream.with(...)
+     */
+    def equalTo[K: TypeInformation](fun: (I2) => K): JoinedStream[I1, I2] = {
+      val keyType = implicitly[TypeInformation[K]]
+      val keyExtractor = new KeySelector[I2, K] {
+        val cleanFun = op.input1.clean(fun)
+        def getKey(in: I2) = cleanFun(in)
+      }
+      finish(keyExtractor)
+    }
+
+    private def finish(keys2: KeySelector[I2, _]): JoinedStream[I1, I2] = {
+      this.keys2 = keys2
+      new JoinedStream[I1, I2](this, createJoinOperator())
+    }
+
+    private def createJoinOperator(): JavaStream[(I1, I2)] = {
+
+      val returnType = new CaseClassTypeInfo[(I1, I2)](
+
+        classOf[(I1, I2)], Seq(op.input1.getType, op.input2.getType), Array("_1", "_2"))
{
+
+        override def createSerializer: TypeSerializer[(I1, I2)] = {
+          val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity)
+          for (i <- 0 until getArity) {
+            fieldSerializers(i) = types(i).createSerializer
+          }
+
+          new CaseClassSerializer[(I1, I2)](classOf[(I1, I2)], fieldSerializers) {
+            override def createInstance(fields: Array[AnyRef]) = {
+              (fields(0).asInstanceOf[I1], fields(1).asInstanceOf[I2])
+            }
+          }
+        }
+      }
+
+      return op.input1.groupBy(keys1).connect(op.input2.groupBy(keys2)).addGeneralWindowCombine(getJoinWindowFunction(this,
(_, _)),
+        returnType, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2)
+    }
+  }
+
+  class JoinedStream[I1, I2](jp: JoinPredicate[I1, I2], javaStream: JavaStream[(I1, I2)])
extends DataStream[(I1, I2)](javaStream) {
+
+    private val op = jp.op
+
+    def apply[R: TypeInformation: ClassTag](fun: (I1, I2) => R): DataStream[R] = {
+
+      val invokable = new CoWindowInvokable[I1, I2, R](
+        clean(getJoinWindowFunction(jp, fun)), op.windowSize, op.slideInterval, op.timeStamp1,
op.timeStamp2)
+
+      javaStream.getExecutionEnvironment().getJobGraphBuilder().setInvokable(javaStream.getId(),
invokable)
+
+      new DataStream[R](javaStream.setType(implicitly[TypeInformation[R]]))
+    }
+  }
+
+  private[flink] def getJoinWindowFunction[I1, I2, R](jp: JoinPredicate[I1, I2], joinFunction:
(I1, I2) => R) = {
+    Validate.notNull(joinFunction, "Join function must not be null.")
+
+    val joinFun = new JoinFunction[I1, I2, R] {
+
+      val cleanFun = clean(joinFunction)
+
+      override def join(first: I1, second: I2): R = {
+        cleanFun(first, second)
+      }
+    }
+
+    new JoinWindowFunction[I1, I2, R](jp.keys1, jp.keys2, joinFun)
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/555837cd/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
index ff89a47..c686497 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
@@ -61,7 +61,7 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
    *
    */
   def groupBy(fields: Int*): WindowedDataStream[T] =
-    new WindowedDataStream[T](javaStream.groupBy(fields: _*))
+    new WindowedDataStream[T](javaStream.groupBy(new FieldsKeySelector[T](fields: _*)))
 
   /**
    * Groups the elements of the WindowedDataStream using the given


Mime
View raw message