flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [21/27] incubator-flink git commit: [scala] [streaming] Added connect op to DataStream and implicit conversion for ConnectedDataStreams
Date Sun, 04 Jan 2015 20:51:11 GMT
[scala] [streaming] Added connect op to DataStream and implicit conversion for ConnectedDataStreams

[scala] [streaming] Changed return types to implicits


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/b9d0241f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/b9d0241f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/b9d0241f

Branch: refs/heads/master
Commit: b9d0241f333ea8ab8fcf31fc2e189888c0315513
Parents: 55ef795
Author: carbone <seniorcarbone@gmail.com>
Authored: Sat Jan 3 20:00:13 2015 +0100
Committer: Gyula Fora <gyfora@apache.org>
Committed: Sat Jan 3 21:44:30 2015 +0100

----------------------------------------------------------------------
 .../api/datastream/WindowedDataStream.java      |   6 +-
 .../windowing/TopSpeedWindowingExample.java     | 175 ++++++++++---------
 .../streaming/windowing/TopSpeedWindowing.scala |  11 +-
 .../scala/streaming/ConnectedDataStream.scala   | 171 +++++++++---------
 .../flink/api/scala/streaming/DataStream.scala  |  15 +-
 .../scala/streaming/StreamingConversions.scala  |   6 +-
 6 files changed, 208 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b9d0241f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 287f29d..e81395d 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -231,7 +231,7 @@ public class WindowedDataStream<OUT> {
 	 * @return The transformed DataStream
 	 */
 	public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reduceFunction)
{
-		return dataStream.transform("NextGenWindowReduce", getType(),
+		return dataStream.transform("WindowReduce", getType(),
 				getReduceInvokable(reduceFunction));
 	}
 
@@ -255,7 +255,7 @@ public class WindowedDataStream<OUT> {
 		TypeInformation<R> outType = TypeExtractor
 				.getGroupReduceReturnTypes(reduceFunction, inType);
 
-		return dataStream.transform("NextGenWindowReduce", outType,
+		return dataStream.transform("WindowReduce", outType,
 				getReduceGroupInvokable(reduceFunction));
 	}
 
@@ -279,7 +279,7 @@ public class WindowedDataStream<OUT> {
 	public <R> SingleOutputStreamOperator<R, ?> reduceGroup(
 			GroupReduceFunction<OUT, R> reduceFunction, TypeInformation<R> outType) {
 
-		return dataStream.transform("NextGenWindowReduce", outType,
+		return dataStream.transform("WindowReduce", outType,
 				getReduceGroupInvokable(reduceFunction));
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b9d0241f/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
index bc3bba5..0f5d8eb 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.examples.windowing;
 
-
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -32,90 +31,98 @@ import java.util.Random;
 import java.util.concurrent.TimeUnit;
 
 /**
- * An example of grouped stream windowing where different eviction and trigger policies can
be used.
- * A source fetches events from cars every 1 sec containing their id, their current speed
(kmh),
- * overall elapsed distance (m) and a timestamp. The streaming
- * example triggers the top speed of each car every x meters elapsed for the last y seconds.
+ * An example of grouped stream windowing where different eviction and trigger
+ * policies can be used. A source fetches events from cars every 1 sec
+ * containing their id, their current speed (kmh), overall elapsed distance (m)
+ * and a timestamp. The streaming example triggers the top speed of each car
+ * every x meters elapsed for the last y seconds.
  */
 public class TopSpeedWindowingExample {
 
-    public static void main(String[] args) throws Exception {
-
-        if (!parseParameters(args)) {
-            return;
-        }
-
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-        @SuppressWarnings({"unchecked", "rawtypes"})
-        DataStream topSpeeds = env
-                .addSource(CarSource.create(numOfCars))
-                .groupBy(0)
-                .window(Time.of(evictionSec, TimeUnit.SECONDS))
-                .every(Delta.of(
-                        new DeltaFunction<Tuple4<Integer, Integer, Double, Long>>()
{
-                            @Override
-                            public double getDelta(Tuple4<Integer, Integer, Double, Long>
oldDataPoint, Tuple4<Integer, Integer, Double, Long> newDataPoint) {
-                                return newDataPoint.f2 - oldDataPoint.f2;
-                            }
-                        }
-                        , new Tuple4<Integer, Integer, Double, Long>(0, 0, 0d, 0l),
triggerMeters))
-                .maxBy(1);
-
-        topSpeeds.print();
-        env.execute("CarTopSpeedWindowingExample");
-    }
-
-    private static class CarSource implements SourceFunction<Tuple4<Integer, Integer,
Double, Long>> {
-        private Integer[] speeds;
-        private Double[] distances;
-
-        private Random rand = new Random();
-
-        private CarSource(int numOfCars) {
-            speeds = new Integer[numOfCars];
-            distances = new Double[numOfCars];
-            Arrays.fill(speeds, 50);
-            Arrays.fill(distances, 0d);
-        }
-
-        public static CarSource create(int cars) {
-            return new CarSource(cars);
-        }
-
-        @Override
-        public void invoke(Collector<Tuple4<Integer, Integer, Double, Long>>
collector) throws Exception {
-
-            while (true) {
-                Thread.sleep(1000);
-                for (int carId = 0; carId < speeds.length; carId++) {
-                    if (rand.nextBoolean())
-                        speeds[carId] = Math.min(100, speeds[carId] + 5);
-                    else
-                        speeds[carId] = Math.max(0, speeds[carId] - 5);
-                    distances[carId] += speeds[carId] / 3.6d;
-                    collector.collect(new Tuple4<Integer, Integer, Double, Long>(carId,
speeds[carId], distances[carId], System.currentTimeMillis()));
-                }
-            }
-        }
-    }
-
-    private static int numOfCars = 2;
-    private static int evictionSec = 10;
-    private static double triggerMeters = 50;
-
-    private static boolean parseParameters(String[] args) {
-
-        if (args.length > 0) {
-            if (args.length == 3) {
-                numOfCars = Integer.valueOf(args[0]);
-                evictionSec = Integer.valueOf(args[1]);
-                triggerMeters = Double.valueOf(args[2]);
-            } else {
-                System.err.println("Usage: TopSpeedWindowingExample <numCars> <evictSec>
<triggerMeters>");
-                return false;
-            }
-        }
-        return true;
-    }
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		@SuppressWarnings({ "rawtypes", "serial" })
+		DataStream topSpeeds = env
+				.addSource(CarSource.create(numOfCars))
+				.groupBy(0)
+				.window(Time.of(evictionSec, TimeUnit.SECONDS))
+				.every(Delta.of(triggerMeters,
+						new DeltaFunction<Tuple4<Integer, Integer, Double, Long>>() {
+							@Override
+							public double getDelta(
+									Tuple4<Integer, Integer, Double, Long> oldDataPoint,
+									Tuple4<Integer, Integer, Double, Long> newDataPoint) {
+								return newDataPoint.f2 - oldDataPoint.f2;
+							}
+						}, new Tuple4<Integer, Integer, Double, Long>(0, 0, 0d, 0l))).maxBy(1);
+
+		topSpeeds.print();
+		env.execute("CarTopSpeedWindowingExample");
+	}
+
+	private static class CarSource implements
+			SourceFunction<Tuple4<Integer, Integer, Double, Long>> {
+
+		private static final long serialVersionUID = 1L;
+		private Integer[] speeds;
+		private Double[] distances;
+
+		private Random rand = new Random();
+
+		private CarSource(int numOfCars) {
+			speeds = new Integer[numOfCars];
+			distances = new Double[numOfCars];
+			Arrays.fill(speeds, 50);
+			Arrays.fill(distances, 0d);
+		}
+
+		public static CarSource create(int cars) {
+			return new CarSource(cars);
+		}
+
+		@Override
+		public void invoke(Collector<Tuple4<Integer, Integer, Double, Long>> collector)
+				throws Exception {
+
+			while (true) {
+				Thread.sleep(1000);
+				for (int carId = 0; carId < speeds.length; carId++) {
+					if (rand.nextBoolean()) {
+						speeds[carId] = Math.min(100, speeds[carId] + 5);
+					} else {
+						speeds[carId] = Math.max(0, speeds[carId] - 5);
+					}
+					distances[carId] += speeds[carId] / 3.6d;
+					collector.collect(new Tuple4<Integer, Integer, Double, Long>(carId,
+							speeds[carId], distances[carId], System.currentTimeMillis()));
+				}
+			}
+		}
+	}
+
+	private static int numOfCars = 2;
+	private static int evictionSec = 10;
+	private static double triggerMeters = 50;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			if (args.length == 3) {
+				numOfCars = Integer.valueOf(args[0]);
+				evictionSec = Integer.valueOf(args[1]);
+				triggerMeters = Double.valueOf(args[2]);
+			} else {
+				System.err
+						.println("Usage: TopSpeedWindowingExample <numCars> <evictSec> <triggerMeters>");
+				return false;
+			}
+		}
+		return true;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b9d0241f/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/TopSpeedWindowing.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/TopSpeedWindowing.scala
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/TopSpeedWindowing.scala
index 10ddfd8..e39fb11 100644
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/TopSpeedWindowing.scala
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/TopSpeedWindowing.scala
@@ -30,10 +30,12 @@ import scala.math.{max, min}
 import scala.util.Random
 
 /**
- * An example of grouped stream windowing where different eviction and trigger policies can
be used.
- * A source fetches events from cars every 1 sec containing their id, their current speed
(kmh),
+ * An example of grouped stream windowing where different eviction and 
+ * trigger policies can be used.A source fetches events from cars 
+ * every 1 sec containing their id, their current speed (kmh),
  * overall elapsed distance (m) and a timestamp. The streaming
- * example triggers the top speed of each car every x meters elapsed for the last y seconds.
+ * example triggers the top speed of each car every x meters elapsed 
+ * for the last y seconds.
  */
 object TopSpeedWindowing {
 
@@ -47,7 +49,8 @@ object TopSpeedWindowing {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val cars = env.addSource(carSource _).groupBy("carId")
       .window(Time.of(evictionSec, SECONDS))
-      .every(Delta.of[CarSpeed](triggerMeters, (oldSp,newSp) => newSp.distance-oldSp.distance,
CarSpeed(0,0,0,0)))
+      .every(Delta.of[CarSpeed](triggerMeters, 
+          (oldSp,newSp) => newSp.distance-oldSp.distance, CarSpeed(0,0,0,0)))
       .reduce((x, y) => if (x.speed > y.speed) x else y)
 
     cars print

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b9d0241f/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/ConnectedDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/ConnectedDataStream.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/ConnectedDataStream.scala
index 1c3f5a1..985e512 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/ConnectedDataStream.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/ConnectedDataStream.scala
@@ -1,12 +1,13 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,19 +20,19 @@ package org.apache.flink.api.scala.streaming
 
 import java.util
 
-import scala.collection.JavaConversions._
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment._
-import org.apache.flink.streaming.api.datastream.{ConnectedDataStream => JavaCStream}
-import org.apache.flink.streaming.api.function.co.{CoWindowFunction, CoFlatMapFunction, CoMapFunction,
CoReduceFunction}
-import org.apache.flink.streaming.api.invokable.operator.co.{CoFlatMapInvokable, CoMapInvokable,
CoReduceInvokable}
+import org.apache.flink.streaming.api.datastream.{ ConnectedDataStream => JavaCStream
}
+import org.apache.flink.streaming.api.function.co.{ CoFlatMapFunction, CoMapFunction, CoReduceFunction,
CoWindowFunction }
+import org.apache.flink.streaming.api.invokable.operator.co.{ CoFlatMapInvokable, CoMapInvokable,
CoReduceInvokable }
 import org.apache.flink.util.Collector
+import org.apache.flink.api.scala.streaming.StreamingConversions._
 
-
+import scala.collection.JavaConversions._
 import scala.reflect.ClassTag
 
-class ConnectedDataStream[IN1,IN2] (javaStream: JavaCStream[IN1,IN2]) {
+class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
 
   /**
    * Applies a CoMap transformation on a {@link ConnectedDataStream} and maps
@@ -40,21 +41,22 @@ class ConnectedDataStream[IN1,IN2] (javaStream: JavaCStream[IN1,IN2])
{
    * @param fun2 for each element of the second input. Each
    * CoMapFunction call returns exactly one element.
    *
-	 * The CoMapFunction used to jointly transform the two input
+   * The CoMapFunction used to jointly transform the two input
    * DataStreams
    * @return The transformed { @link DataStream}
    */
-  def map[R: TypeInformation: ClassTag](fun1: IN1 => R, fun2: IN2 => R): DataStream[R]
= {
+  def map[R: TypeInformation: ClassTag](fun1: IN1 => R, fun2: IN2 => R): 
+  DataStream[R] = {
     if (fun1 == null || fun2 == null) {
       throw new NullPointerException("Map function must not be null.")
     }
-    val comapper = new CoMapFunction[IN1,IN2,R] {
-       def map1(in1: IN1): R = clean(fun1)(in1)
-       def map2(in2: IN2): R = clean(fun2)(in2)
+    val comapper = new CoMapFunction[IN1, IN2, R] {
+      def map1(in1: IN1): R = clean(fun1)(in1)
+      def map2(in2: IN2): R = clean(fun2)(in2)
     }
 
-    new DataStream(javaStream.addCoFunction("map",implicitly[TypeInformation[R]],
-    new CoMapInvokable[IN1,IN2,R](comapper)))
+    new DataStream(javaStream.addCoFunction("map", implicitly[TypeInformation[R]],
+      new CoMapInvokable[IN1, IN2, R](comapper)))
   }
 
   /**
@@ -67,17 +69,18 @@ class ConnectedDataStream[IN1,IN2] (javaStream: JavaCStream[IN1,IN2])
{
    * the {@link RichFuntion} interface.
    *
    * @param coMapper
-	 * The CoMapFunction used to jointly transform the two input
+   * The CoMapFunction used to jointly transform the two input
    * DataStreams
    * @return The transformed { @link DataStream}
    */
-  def map[R: TypeInformation: ClassTag](coMapper: CoMapFunction[IN1,IN2,R]): DataStream[R]
= {
+  def map[R: TypeInformation: ClassTag](coMapper: CoMapFunction[IN1, IN2, R]): 
+  DataStream[R] = {
     if (coMapper == null) {
       throw new NullPointerException("Map function must not be null.")
     }
 
-    new DataStream(javaStream.addCoFunction("map",implicitly[TypeInformation[R]],
-      new CoMapInvokable[IN1,IN2,R](coMapper)))
+    new DataStream(javaStream.addCoFunction("map", implicitly[TypeInformation[R]],
+      new CoMapInvokable[IN1, IN2, R](coMapper)))
   }
 
   /**
@@ -91,16 +94,17 @@ class ConnectedDataStream[IN1,IN2] (javaStream: JavaCStream[IN1,IN2])
{
    * interface.
    *
    * @param coFlatMapper
-	 * The CoFlatMapFunction used to jointly transform the two input
+   * The CoFlatMapFunction used to jointly transform the two input
    * DataStreams
    * @return The transformed { @link DataStream}
    */
-  def flatMap[R: TypeInformation: ClassTag](coFlatMapper: CoFlatMapFunction[IN1,IN2,R]):
DataStream[R] = {
+  def flatMap[R: TypeInformation: ClassTag](coFlatMapper: CoFlatMapFunction[IN1, IN2, R]):

+  DataStream[R] = {
     if (coFlatMapper == null) {
       throw new NullPointerException("FlatMap function must not be null.")
     }
     new DataStream[R](javaStream.addCoFunction("flatMap", implicitly[TypeInformation[R]],
-      new CoFlatMapInvokable[IN1,IN2, R](coFlatMapper)))
+      new CoFlatMapInvokable[IN1, IN2, R](coFlatMapper)))
   }
 
   /**
@@ -110,16 +114,17 @@ class ConnectedDataStream[IN1,IN2] (javaStream: JavaCStream[IN1,IN2])
{
    * and @param fun2 for each element of the second
    * input. Each CoFlatMapFunction call returns any number of elements
    * including none.
-
+   *
    * @return The transformed { @link DataStream}
    */
-  def flatMap[R: TypeInformation: ClassTag](fun1: (IN1, Collector[R]) => Unit, fun2: (IN2,
Collector[R]) => Unit): DataStream[R] = {
+  def flatMap[R: TypeInformation: ClassTag](fun1: (IN1, Collector[R]) => Unit, 
+      fun2: (IN2, Collector[R]) => Unit): DataStream[R] = {
     if (fun1 == null || fun2 == null) {
       throw new NullPointerException("FlatMap functions must not be null.")
     }
-    val flatMapper = new CoFlatMapFunction[IN1,IN2, R] {
-       def flatMap1(value: IN1, out: Collector[R]): Unit = clean(fun1)(value,out)
-       def flatMap2(value: IN2, out: Collector[R]): Unit = clean(fun2)(value,out)
+    val flatMapper = new CoFlatMapFunction[IN1, IN2, R] {
+      def flatMap1(value: IN1, out: Collector[R]): Unit = clean(fun1)(value, out)
+      def flatMap2(value: IN2, out: Collector[R]): Unit = clean(fun2)(value, out)
     }
     flatMap(flatMapper)
   }
@@ -131,15 +136,15 @@ class ConnectedDataStream[IN1,IN2] (javaStream: JavaCStream[IN1,IN2])
{
    * {@link ConnectedDataStream#reduce}
    *
    * @param keyPosition1
-	 * The field used to compute the hashcode of the elements in the
+   * The field used to compute the hashcode of the elements in the
    * first input stream.
    * @param keyPosition2
-	 * The field used to compute the hashcode of the elements in the
+   * The field used to compute the hashcode of the elements in the
    * second input stream.
    * @return @return The transformed { @link ConnectedDataStream}
    */
-  def groupBy(keyPosition1 : Int,keyPosition2: Int) : ConnectedDataStream[IN1,IN2] = {
-    new ConnectedDataStream[IN1,IN2](javaStream.groupBy(keyPosition1,keyPosition2))
+  def groupBy(keyPosition1: Int, keyPosition2: Int): ConnectedDataStream[IN1, IN2] = {
+    javaStream.groupBy(keyPosition1, keyPosition2)
   }
 
   /**
@@ -149,13 +154,14 @@ class ConnectedDataStream[IN1,IN2] (javaStream: JavaCStream[IN1,IN2])
{
    * {@link ConnectedDataStream#reduce}
    *
    * @param keyPositions1
-	 * The fields used to group the first input stream.
+   * The fields used to group the first input stream.
    * @param keyPositions2
-	 * The fields used to group the second input stream.
+   * The fields used to group the second input stream.
    * @return @return The transformed { @link ConnectedDataStream}
    */
-  def groupBy(keyPositions1 : Array[Int],keyPositions2: Array[Int]) : ConnectedDataStream[IN1,IN2]
= {
-    new ConnectedDataStream[IN1,IN2](javaStream.groupBy(keyPositions1,keyPositions2))
+  def groupBy(keyPositions1: Array[Int], keyPositions2: Array[Int]): 
+  ConnectedDataStream[IN1, IN2] = {
+    javaStream.groupBy(keyPositions1, keyPositions2)
   }
 
   /**
@@ -166,13 +172,13 @@ class ConnectedDataStream[IN1,IN2] (javaStream: JavaCStream[IN1,IN2])
{
    * to drill down into objects, as in {@code "field1.getInnerField2()" }.
    *
    * @param field1
-	 * The grouping expression for the first input
+   * The grouping expression for the first input
    * @param field2
-	 * The grouping expression for the second input
+   * The grouping expression for the second input
    * @return The grouped { @link ConnectedDataStream}
    */
-  def groupBy(field1 : String, field2: String) : ConnectedDataStream[IN1,IN2] = {
-    new ConnectedDataStream[IN1,IN2](javaStream.groupBy(field1,field2))
+  def groupBy(field1: String, field2: String): ConnectedDataStream[IN1, IN2] = {
+    javaStream.groupBy(field1, field2)
   }
 
   /**
@@ -184,13 +190,14 @@ class ConnectedDataStream[IN1,IN2] (javaStream: JavaCStream[IN1,IN2])
{
    * .
    *
    * @param fields1
-	 * The grouping expressions for the first input
+   * The grouping expressions for the first input
    * @param fields2
-	 * The grouping expressions for the second input
+   * The grouping expressions for the second input
    * @return The grouped { @link ConnectedDataStream}
    */
-  def groupBy(fields1 : Array[String],fields2: Array[String]) : ConnectedDataStream[IN1,IN2]
= {
-    new ConnectedDataStream[IN1,IN2](javaStream.groupBy(fields1,fields2))
+  def groupBy(fields1: Array[String], fields2: Array[String]): 
+  ConnectedDataStream[IN1, IN2] = {
+    javaStream.groupBy(fields1, fields2)
   }
 
   /**
@@ -200,12 +207,13 @@ class ConnectedDataStream[IN1,IN2] (javaStream: JavaCStream[IN1,IN2])
{
    * {@link ConnectedDataStream#reduce}
    *
    * @param fun1
-	 * The function used for grouping the first input
+   * The function used for grouping the first input
    * @param fun2
-	 * The function used for grouping the second input
+   * The function used for grouping the second input
    * @return @return The transformed { @link ConnectedDataStream}
    */
-  def groupBy[K: TypeInformation](fun1: IN1 => _, fun2: IN2 => _): ConnectedDataStream[IN1,IN2]
= {
+  def groupBy[K: TypeInformation](fun1: IN1 => _, fun2: IN2 => _):
+  ConnectedDataStream[IN1, IN2] = {
 
     val keyExtractor1 = new KeySelector[IN1, Any] {
       def getKey(in: IN1) = clean(fun1)(in)
@@ -214,7 +222,7 @@ class ConnectedDataStream[IN1,IN2] (javaStream: JavaCStream[IN1,IN2])
{
       def getKey(in: IN2) = clean(fun2)(in)
     }
 
-    new ConnectedDataStream[IN1,IN2](javaStream.groupBy(keyExtractor1,keyExtractor2))
+    javaStream.groupBy(keyExtractor1, keyExtractor2)
   }
 
   /**
@@ -227,17 +235,18 @@ class ConnectedDataStream[IN1,IN2] (javaStream: JavaCStream[IN1,IN2])
{
    * the reduce function can be applied incrementally.
    *
    * @param coReducer
-	 * The { @link CoReduceFunction} that will be called for every
+   * The { @link CoReduceFunction} that will be called for every
    *             element of the inputs.
    * @return The transformed { @link DataStream}.
    */
-  def reduce[R: TypeInformation: ClassTag](coReducer: CoReduceFunction[IN1,IN2,R]): DataStream[R]
= {
+  def reduce[R: TypeInformation: ClassTag](coReducer: CoReduceFunction[IN1, IN2, R]): 
+  DataStream[R] = {
     if (coReducer == null) {
       throw new NullPointerException("Reduce function must not be null.")
     }
 
     new DataStream[R](javaStream.addCoFunction("coReduce", implicitly[TypeInformation[R]],
-      new CoReduceInvokable[IN1,IN2,R](coReducer)))
+      new CoReduceInvokable[IN1, IN2, R](coReducer)))
   }
 
   /**
@@ -256,20 +265,20 @@ class ConnectedDataStream[IN1,IN2] (javaStream: JavaCStream[IN1,IN2])
{
    *
    * @return The transformed { @link DataStream}.
    */
-  def reduce[R: TypeInformation: ClassTag](reducer1: (IN1,IN1) => IN1, reducer2: (IN2,IN2)
=> IN2,
-                                            mapper1: IN1 => R, mapper2: IN2 => R):
DataStream[R] = {
+  def reduce[R: TypeInformation: ClassTag](reducer1: (IN1, IN1) => IN1, 
+      reducer2: (IN2, IN2) => IN2,mapper1: IN1 => R, mapper2: IN2 => R): DataStream[R]
= {
     if (mapper1 == null || mapper2 == null) {
       throw new NullPointerException("Map functions must not be null.")
     }
     if (reducer1 == null || reducer2 == null) {
       throw new NullPointerException("Reduce functions must not be null.")
     }
-    
-    val reducer = new CoReduceFunction[IN1,IN2,R] {
-       def reduce1(value1: IN1, value2: IN1): IN1 = clean(reducer1)(value1,value2)
-       def map2(value: IN2): R = clean(mapper2)(value)
-       def reduce2(value1: IN2, value2: IN2): IN2 = clean(reducer2)(value1,value2)
-       def map1(value: IN1): R = clean(mapper1)(value)
+
+    val reducer = new CoReduceFunction[IN1, IN2, R] {
+      def reduce1(value1: IN1, value2: IN1): IN1 = clean(reducer1)(value1, value2)
+      def map2(value: IN2): R = clean(mapper2)(value)
+      def reduce2(value1: IN2, value2: IN2): IN2 = clean(reducer2)(value1, value2)
+      def map1(value: IN1): R = clean(mapper1)(value)
     }
     reduce(reducer)
   }
@@ -281,23 +290,24 @@ class ConnectedDataStream[IN1,IN2] (javaStream: JavaCStream[IN1,IN2])
{
    * default to compute windows.
    *
    * @param coWindowFunction
-	 * The { @link CoWindowFunction} that will be applied for the time
+   * The { @link CoWindowFunction} that will be applied for the time
    *             windows.
    * @param windowSize
-	 * Size of the windows that will be aligned for both streams in
+   * Size of the windows that will be aligned for both streams in
    * milliseconds.
    * @param slideInterval
-	 * After every function call the windows will be slid by this
+   * After every function call the windows will be slid by this
    * interval.
    *
    * @return The transformed { @link DataStream}.
    */
-  def windowReduce[R: TypeInformation: ClassTag](coWindowFunction: CoWindowFunction[IN1,IN2,R],
windowSize:Long, slideInterval: Long) = {
-    if(coWindowFunction == null){
+  def windowReduce[R: TypeInformation: ClassTag](coWindowFunction: 
+      CoWindowFunction[IN1, IN2, R], windowSize: Long, slideInterval: Long) = {
+    if (coWindowFunction == null) {
       throw new NullPointerException("CoWindow function must no be null")
     }
 
-    new DataStream[R](javaStream.windowReduce(coWindowFunction, windowSize, slideInterval))
+    javaStream.windowReduce(coWindowFunction, windowSize, slideInterval)
   }
 
   /**
@@ -307,26 +317,28 @@ class ConnectedDataStream[IN1,IN2] (javaStream: JavaCStream[IN1,IN2])
{
    * default to compute windows.
    *
    * @param coWindower
-	 * The coWindowing function to be applied for the time windows.
+   * The coWindowing function to be applied for the time windows.
    * @param windowSize
-	 * Size of the windows that will be aligned for both streams in
+   * Size of the windows that will be aligned for both streams in
    * milliseconds.
    * @param slideInterval
-	 * After every function call the windows will be slid by this
+   * After every function call the windows will be slid by this
    * interval.
    *
    * @return The transformed { @link DataStream}.
    */
-  def windowReduce[R: TypeInformation: ClassTag](coWindower: (Seq[IN1], Seq[IN2], Collector[R])
=> Unit , windowSize:Long, slideInterval: Long) = {
-    if(coWindower == null){
+  def windowReduce[R: TypeInformation: ClassTag](coWindower: (Seq[IN1], Seq[IN2], 
+      Collector[R]) => Unit, windowSize: Long, slideInterval: Long) = {
+    if (coWindower == null) {
       throw new NullPointerException("CoWindow function must no be null")
     }
 
-    val coWindowFun = new CoWindowFunction[IN1,IN2,R] {
-       def coWindow(first: util.List[IN1], second: util.List[IN2], out: Collector[R]): Unit
= clean(coWindower)(first,second,out)
+    val coWindowFun = new CoWindowFunction[IN1, IN2, R] {
+      def coWindow(first: util.List[IN1], second: util.List[IN2], 
+          out: Collector[R]): Unit = clean(coWindower)(first, second, out)
     }
 
-    new DataStream[R](javaStream.windowReduce(coWindowFun, windowSize, slideInterval))
+    javaStream.windowReduce(coWindowFun, windowSize, slideInterval)
   }
 
   /**
@@ -335,7 +347,7 @@ class ConnectedDataStream[IN1,IN2] (javaStream: JavaCStream[IN1,IN2])
{
    * @return The first DataStream.
    */
   def getFirst(): DataStream[IN1] = {
-     new DataStream[IN1](javaStream.getFirst)
+    javaStream.getFirst
   }
 
   /**
@@ -344,7 +356,7 @@ class ConnectedDataStream[IN1,IN2] (javaStream: JavaCStream[IN1,IN2])
{
    * @return The second DataStream.
    */
   def getSecond(): DataStream[IN2] = {
-    new DataStream[IN2](javaStream.getSecond)
+    javaStream.getSecond
   }
 
   /**
@@ -365,5 +377,4 @@ class ConnectedDataStream[IN1,IN2] (javaStream: JavaCStream[IN1,IN2])
{
     javaStream.getInputType2
   }
 
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b9d0241f/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
index 546d8a9..ccfd176 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
@@ -19,22 +19,20 @@
 package org.apache.flink.api.scala.streaming
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean
-import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
+import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream,
+  SingleOutputStreamOperator, GroupedDataStream}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import scala.reflect.ClassTag
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.streaming.api.invokable.operator.MapInvokable
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
 import org.apache.flink.util.Collector
 import org.apache.flink.api.common.functions.FlatMapFunction
 import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable
 import org.apache.flink.api.common.functions.ReduceFunction
 import org.apache.flink.streaming.api.invokable.StreamInvokable
-import org.apache.flink.streaming.api.datastream.GroupedDataStream
 import org.apache.flink.streaming.api.invokable.operator.GroupedReduceInvokable
 import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable
-import org.apache.flink.streaming.api.datastream.GroupedDataStream
 import org.apache.flink.api.common.functions.ReduceFunction
 import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.api.common.functions.FilterFunction
@@ -94,6 +92,15 @@ class DataStream[T](javaStream: JavaStream[T]) {
     javaStream.merge(dataStreams.map(_.getJavaStream): _*)
 
   /**
+   * Creates a new ConnectedDataStream by connecting
+   * DataStream outputs of different type with each other. The
+   * DataStreams connected using this operators can be used with CoFunctions.
+   *
+   */
+  def connect[T2](dataStream: DataStream[T2]): ConnectedDataStream[T, T2] = 
+    javaStream.connect(dataStream.getJavaStream)
+
+  /**
    * Groups the elements of a DataStream by the given key positions (for tuple/array types)
to
    * be used with grouped operators like grouped reduce or grouped aggregations
    *

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b9d0241f/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamingConversions.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamingConversions.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamingConversions.scala
index a34d0dc..9aefa04 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamingConversions.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamingConversions.scala
@@ -21,9 +21,10 @@ package org.apache.flink.api.scala.streaming
 import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
 import org.apache.flink.streaming.api.datastream.{ WindowedDataStream => JavaWStream }
 import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaStream
}
+import org.apache.flink.streaming.api.datastream.{ ConnectedDataStream => JavaConStream
}
 
 object StreamingConversions {
-  
+
   implicit def javaToScalaStream[R](javaStream: JavaStream[R]): DataStream[R] =
     new DataStream[R](javaStream)
 
@@ -33,4 +34,7 @@ object StreamingConversions {
   implicit def javaToScalaSplitStream[R](javaStream: SplitJavaStream[R]): SplitDataStream[R]
=
     new SplitDataStream[R](javaStream)
 
+  implicit def javaToScalaConnectedStream[IN1, IN2](javaStream: JavaConStream[IN1, IN2]):

+  ConnectedDataStream[IN1, IN2] = new ConnectedDataStream[IN1, IN2](javaStream)
+
 }


Mime
View raw message