flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [1/5] incubator-flink git commit: [scala] [streaming] added scala streams as sources in streaming-api scala examples
Date Tue, 06 Jan 2015 14:43:20 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master 92ceacd23 -> baf81c6c3


[scala] [streaming] added scala streams as sources in streaming-api scala examples


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/baf81c6c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/baf81c6c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/baf81c6c

Branch: refs/heads/master
Commit: baf81c6c31016d4ef02584fea1b6d2d4db1168c2
Parents: 5daa45c
Author: carbone <seniorcarbone@gmail.com>
Authored: Tue Jan 6 13:53:01 2015 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Tue Jan 6 15:09:04 2015 +0100

----------------------------------------------------------------------
 .../examples/windowing/TopSpeedWindowing.scala  | 44 ++++++++++----------
 .../scala/examples/windowing/WindowJoin.scala   | 36 ++++++++--------
 2 files changed, 40 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/baf81c6c/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
index a18eb37..a43f479 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
@@ -21,15 +21,13 @@ package org.apache.flink.streaming.scala.examples.windowing
 
 import java.util.concurrent.TimeUnit._
 
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.util.Collector
-import scala.math.{max, min}
-
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.scala.windowing.{Delta, Time}
+import org.apache.flink.api.scala._
+import scala.Stream._
+import scala.math._
 import scala.util.Random
 
-import org.apache.flink.streaming.api.scala.windowing.Time
-import org.apache.flink.streaming.api.scala.windowing.Delta
-
 /**
  * An example of grouped stream windowing where different eviction and 
  * trigger policies can be used. A source fetches events from cars 
@@ -40,7 +38,7 @@ import org.apache.flink.streaming.api.scala.windowing.Delta
  */
 object TopSpeedWindowing {
 
-  case class CarSpeed(carId: Int, speed: Int, distance: Double, time: Long)
+  case class CarEvent(carId: Int, speed: Int, distance: Double, time: Long) extends Serializable
 
   def main(args: Array[String]) {
     if (!parseParameters(args)) {
@@ -48,10 +46,11 @@ object TopSpeedWindowing {
     }
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val cars = env.addSource(carSource _).groupBy("carId")
+    val cars = env.fromCollection(genCarStream())
+      .groupBy("carId")
       .window(Time.of(evictionSec, SECONDS))
-      .every(Delta.of[CarSpeed](triggerMeters, 
-          (oldSp,newSp) => newSp.distance-oldSp.distance, CarSpeed(0,0,0,0)))
+      .every(Delta.of[CarEvent](triggerMeters,
+          (oldSp,newSp) => newSp.distance-oldSp.distance, CarEvent(0,0,0,0)))
       .reduce((x, y) => if (x.speed > y.speed) x else y)
 
     cars print
@@ -60,19 +59,20 @@ object TopSpeedWindowing {
 
   }
 
-  def carSource(out: Collector[CarSpeed]) = {
-
-    val speeds = new Array[Int](numOfCars)
-    val distances = new Array[Double](numOfCars)
+  def genCarStream(): Stream[CarEvent] = {
 
-    while (true) {
-      Thread sleep 1000
-      for (i <- 0 until speeds.length) {
-        speeds(i) = if (Random.nextBoolean) min(100, speeds(i) + 5) else max(0, speeds(i)
- 5)
-        distances(i) += speeds(i) / 3.6d
-        out.collect(new CarSpeed(i, speeds(i), distances(i), System.currentTimeMillis))
-      }
+    def nextSpeed(carEvent : CarEvent) : CarEvent =
+    {
+      val next = 
+        if (Random.nextBoolean) min(100, carEvent.speed + 5) else max(0, carEvent.speed -
5)
+      CarEvent(carEvent.carId, next, carEvent.distance + next/3.6d,System.currentTimeMillis)
+    }
+    def carStream(speeds : Stream[CarEvent]) : Stream[CarEvent] =
+    {
+      Thread.sleep(1000)
+      speeds.append(carStream(speeds.map(nextSpeed)))
     }
+    carStream(range(0, numOfCars).map(CarEvent(_,50,0,System.currentTimeMillis())))
   }
 
   def parseParameters(args: Array[String]): Boolean = {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/baf81c6c/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala
index 08c7d65..d6c0363 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala
@@ -18,8 +18,10 @@
 
 package org.apache.flink.streaming.scala.examples.windowing
 
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.util.Collector
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+
+import scala.Stream._
 import scala.util.Random
 import java.util.concurrent.TimeUnit
 
@@ -34,8 +36,8 @@ object WindowJoin {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 
     //Create streams for names and ages by mapping the inputs to the corresponding objects
-    val names = env.addSource(nameStream _).map(x => Name(x._1, x._2))
-    val ages = env.addSource(ageStream _).map(x => Age(x._1, x._2))
+    val names = env.fromCollection(nameStream).map(x => Name(x._1, x._2))
+    val ages = env.fromCollection(ageStream).map(x => Age(x._1, x._2))
 
     //Join the two input streams by id on the last second every 2 seconds and create new

     //Person objects containing both name and age
@@ -48,24 +50,22 @@ object WindowJoin {
     env.execute("WindowJoin")
   }
 
-  //Stream source for generating (id, name) pairs
-  def nameStream(out: Collector[(Long, String)]) = {
-    val names = Array("tom", "jerry", "alice", "bob", "john", "grace")
-
-    for (i <- 1 to 10000) {
-      if (i % 100 == 0) Thread.sleep(1000) else {
-        out.collect((i, names(Random.nextInt(names.length))))
-      }
+  def nameStream() : Stream[(Long,String)] = {
+    def nameMapper(names: Array[String])(x: Int) : (Long, String) =
+    {
+      if(x%100==0) Thread.sleep(1000)
+      (x, names(Random.nextInt(names.length)))
     }
+    range(1,10000).map(nameMapper(Array("tom", "jerry", "alice", "bob", "john", "grace")))
   }
 
-  //Stream source for generating (id, age) pairs
-  def ageStream(out: Collector[(Long, Int)]) = {
-    for (i <- 1 to 10000) {
-      if (i % 100 == 0) Thread.sleep(1000) else {
-        out.collect((i, Random.nextInt(90)))
-      }
+  def ageStream() : Stream[(Long,Int)] = {
+    def ageMapper(x: Int) : (Long, Int) =
+    {
+      if(x%100==0) Thread.sleep(1000)
+      (x, Random.nextInt(90))
     }
+    range(1,10000).map(ageMapper)
   }
 
 }


Mime
View raw message