flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [19/27] incubator-flink git commit: [scala] [streaming] added group windowing example with different policy types
Date Sun, 04 Jan 2015 20:51:09 GMT
[scala] [streaming] added group windowing example with different policy types

[scala] [streaming] updated example with scala delta helper usage


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/a761cdcb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/a761cdcb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/a761cdcb

Branch: refs/heads/master
Commit: a761cdcb7186a8eb565de2d6313cf2087b3ee087
Parents: d4ec009
Author: senorcarbone <seniorcarbone@gmail.com>
Authored: Fri Jan 2 11:15:49 2015 +0100
Committer: carbone <seniorcarbone@gmail.com>
Committed: Sat Jan 3 20:21:12 2015 +0100

----------------------------------------------------------------------
 .../windowing/TopSpeedWindowingExample.java     | 121 +++++++++++++++++++
 .../streaming/windowing/TopSpeedWindowing.scala |  93 ++++++++++++++
 2 files changed, 214 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a761cdcb/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
new file mode 100644
index 0000000..bc3bba5
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.windowing;
+
+
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
+import org.apache.flink.streaming.api.windowing.helper.Delta;
+import org.apache.flink.streaming.api.windowing.helper.Time;
+import org.apache.flink.util.Collector;
+
+import java.util.Arrays;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An example of grouped stream windowing where different eviction and trigger policies can
be used.
+ * A source fetches events from cars every 1 sec containing their id, their current speed
(kmh),
+ * overall elapsed distance (m) and a timestamp. The streaming
+ * example triggers the top speed of each car every x meters elapsed for the last y seconds.
+ */
+public class TopSpeedWindowingExample {
+
+    public static void main(String[] args) throws Exception {
+
+        if (!parseParameters(args)) {
+            return;
+        }
+
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+        @SuppressWarnings({"unchecked", "rawtypes"})
+        DataStream topSpeeds = env
+                .addSource(CarSource.create(numOfCars))
+                .groupBy(0)
+                .window(Time.of(evictionSec, TimeUnit.SECONDS))
+                .every(Delta.of(
+                        new DeltaFunction<Tuple4<Integer, Integer, Double, Long>>()
{
+                            @Override
+                            public double getDelta(Tuple4<Integer, Integer, Double, Long>
oldDataPoint, Tuple4<Integer, Integer, Double, Long> newDataPoint) {
+                                return newDataPoint.f2 - oldDataPoint.f2;
+                            }
+                        }
+                        , new Tuple4<Integer, Integer, Double, Long>(0, 0, 0d, 0l),
triggerMeters))
+                .maxBy(1);
+
+        topSpeeds.print();
+        env.execute("CarTopSpeedWindowingExample");
+    }
+
+    private static class CarSource implements SourceFunction<Tuple4<Integer, Integer,
Double, Long>> {
+        private Integer[] speeds;
+        private Double[] distances;
+
+        private Random rand = new Random();
+
+        private CarSource(int numOfCars) {
+            speeds = new Integer[numOfCars];
+            distances = new Double[numOfCars];
+            Arrays.fill(speeds, 50);
+            Arrays.fill(distances, 0d);
+        }
+
+        public static CarSource create(int cars) {
+            return new CarSource(cars);
+        }
+
+        @Override
+        public void invoke(Collector<Tuple4<Integer, Integer, Double, Long>>
collector) throws Exception {
+
+            while (true) {
+                Thread.sleep(1000);
+                for (int carId = 0; carId < speeds.length; carId++) {
+                    if (rand.nextBoolean())
+                        speeds[carId] = Math.min(100, speeds[carId] + 5);
+                    else
+                        speeds[carId] = Math.max(0, speeds[carId] - 5);
+                    distances[carId] += speeds[carId] / 3.6d;
+                    collector.collect(new Tuple4<Integer, Integer, Double, Long>(carId,
speeds[carId], distances[carId], System.currentTimeMillis()));
+                }
+            }
+        }
+    }
+
+    private static int numOfCars = 2;
+    private static int evictionSec = 10;
+    private static double triggerMeters = 50;
+
+    private static boolean parseParameters(String[] args) {
+
+        if (args.length > 0) {
+            if (args.length == 3) {
+                numOfCars = Integer.valueOf(args[0]);
+                evictionSec = Integer.valueOf(args[1]);
+                triggerMeters = Double.valueOf(args[2]);
+            } else {
+                System.err.println("Usage: TopSpeedWindowingExample <numCars> <evictSec>
<triggerMeters>");
+                return false;
+            }
+        }
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a761cdcb/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/TopSpeedWindowing.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/TopSpeedWindowing.scala
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/TopSpeedWindowing.scala
new file mode 100644
index 0000000..10ddfd8
--- /dev/null
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/TopSpeedWindowing.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.scala.streaming.windowing
+
+
+import java.util.concurrent.TimeUnit._
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment
+import org.apache.flink.api.scala.streaming.windowing.Delta
+import org.apache.flink.streaming.api.windowing.helper.Time
+import org.apache.flink.util.Collector
+
+import scala.math.{max, min}
+import scala.util.Random
+
+/**
+ * An example of grouped stream windowing where different eviction and trigger policies can
be used.
+ * A source fetches events from cars every 1 sec containing their id, their current speed
(kmh),
+ * overall elapsed distance (m) and a timestamp. The streaming
+ * example triggers the top speed of each car every x meters elapsed for the last y seconds.
+ */
+object TopSpeedWindowing {
+
+  case class CarSpeed(carId: Int, speed: Int, distance: Double, time: Long)
+
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val cars = env.addSource(carSource _).groupBy("carId")
+      .window(Time.of(evictionSec, SECONDS))
+      .every(Delta.of[CarSpeed](triggerMeters, (oldSp,newSp) => newSp.distance-oldSp.distance,
CarSpeed(0,0,0,0)))
+      .reduce((x, y) => if (x.speed > y.speed) x else y)
+
+    cars print
+
+    env.execute("TopSpeedWindowing")
+
+  }
+
+  def carSource(out: Collector[CarSpeed]) = {
+
+    val speeds = new Array[Int](numOfCars)
+    val distances = new Array[Double](numOfCars)
+
+    while (true) {
+      Thread sleep 1000
+      for (i <- 0 until speeds.length) {
+        speeds(i) = if (Random.nextBoolean) min(100, speeds(i) + 5) else max(0, speeds(i)
- 5)
+        distances(i) += speeds(i) / 3.6d
+        out.collect(new CarSpeed(i, speeds(i), distances(i), System.currentTimeMillis))
+      }
+    }
+  }
+
+  def parseParameters(args: Array[String]): Boolean = {
+    if (args.length > 0) {
+      if (args.length == 3) {
+        numOfCars = args(0).toInt
+        evictionSec = args(1).toInt
+        triggerMeters = args(2).toDouble
+      }
+      else {
+        System.err.println("Usage: TopSpeedWindowing <numCars> <evictSec> <triggerMeters>")
+        false
+      }
+    }
+    true
+  }
+
+  var numOfCars = 2
+  var evictionSec = 10
+  var triggerMeters = 50d
+
+}


Mime
View raw message