Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D1E7310BCF for ; Sun, 4 Jan 2015 20:51:22 +0000 (UTC) Received: (qmail 37722 invoked by uid 500); 4 Jan 2015 20:51:23 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 37694 invoked by uid 500); 4 Jan 2015 20:51:23 -0000 Mailing-List: contact commits-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.incubator.apache.org Delivered-To: mailing list commits@flink.incubator.apache.org Received: (qmail 37685 invoked by uid 99); 4 Jan 2015 20:51:23 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 04 Jan 2015 20:51:23 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Sun, 04 Jan 2015 20:50:54 +0000 Received: (qmail 37385 invoked by uid 99); 4 Jan 2015 20:50:52 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 04 Jan 2015 20:50:52 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id DB8F0A3E8D9; Sun, 4 Jan 2015 20:50:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mbalassi@apache.org To: commits@flink.incubator.apache.org Date: Sun, 04 Jan 2015 20:51:09 -0000 Message-Id: <2394bb95016f4646b8c39b59e4e48446@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [19/27] incubator-flink git commit: [scala] [streaming] added group windowing example with different policy types X-Virus-Checked: Checked by ClamAV on apache.org [scala] [streaming] added group windowing example with different policy types [scala] [streaming] updated example with scala delta helper usage Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/a761cdcb Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/a761cdcb Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/a761cdcb Branch: refs/heads/master Commit: a761cdcb7186a8eb565de2d6313cf2087b3ee087 Parents: d4ec009 Author: senorcarbone Authored: Fri Jan 2 11:15:49 2015 +0100 Committer: carbone Committed: Sat Jan 3 20:21:12 2015 +0100 ---------------------------------------------------------------------- .../windowing/TopSpeedWindowingExample.java | 121 +++++++++++++++++++ .../streaming/windowing/TopSpeedWindowing.scala | 93 ++++++++++++++ 2 files changed, 214 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a761cdcb/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java new file mode 100644 index 0000000..bc3bba5 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.windowing; + + +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.function.source.SourceFunction; +import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction; +import org.apache.flink.streaming.api.windowing.helper.Delta; +import org.apache.flink.streaming.api.windowing.helper.Time; +import org.apache.flink.util.Collector; + +import java.util.Arrays; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +/** + * An example of grouped stream windowing where different eviction and trigger policies can be used. + * A source fetches events from cars every 1 sec containing their id, their current speed (kmh), + * overall elapsed distance (m) and a timestamp. The streaming + * example triggers the top speed of each car every x meters elapsed for the last y seconds. + */ +public class TopSpeedWindowingExample { + + public static void main(String[] args) throws Exception { + + if (!parseParameters(args)) { + return; + } + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + @SuppressWarnings({"unchecked", "rawtypes"}) + DataStream topSpeeds = env + .addSource(CarSource.create(numOfCars)) + .groupBy(0) + .window(Time.of(evictionSec, TimeUnit.SECONDS)) + .every(Delta.of( + new DeltaFunction>() { + @Override + public double getDelta(Tuple4 oldDataPoint, Tuple4 newDataPoint) { + return newDataPoint.f2 - oldDataPoint.f2; + } + } + , new Tuple4(0, 0, 0d, 0l), triggerMeters)) + .maxBy(1); + + topSpeeds.print(); + env.execute("CarTopSpeedWindowingExample"); + } + + private static class CarSource implements SourceFunction> { + private Integer[] speeds; + private Double[] distances; + + private Random rand = new Random(); + + private CarSource(int numOfCars) { + speeds = new Integer[numOfCars]; + distances = new Double[numOfCars]; + Arrays.fill(speeds, 50); + Arrays.fill(distances, 0d); + } + + public static CarSource create(int cars) { + return new CarSource(cars); + } + + @Override + public void invoke(Collector> collector) throws Exception { + + while (true) { + Thread.sleep(1000); + for (int carId = 0; carId < speeds.length; carId++) { + if (rand.nextBoolean()) + speeds[carId] = Math.min(100, speeds[carId] + 5); + else + speeds[carId] = Math.max(0, speeds[carId] - 5); + distances[carId] += speeds[carId] / 3.6d; + collector.collect(new Tuple4(carId, speeds[carId], distances[carId], System.currentTimeMillis())); + } + } + } + } + + private static int numOfCars = 2; + private static int evictionSec = 10; + private static double triggerMeters = 50; + + private static boolean parseParameters(String[] args) { + + if (args.length > 0) { + if (args.length == 3) { + numOfCars = Integer.valueOf(args[0]); + evictionSec = Integer.valueOf(args[1]); + triggerMeters = Double.valueOf(args[2]); + } else { + System.err.println("Usage: TopSpeedWindowingExample "); + return false; + } + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a761cdcb/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/TopSpeedWindowing.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/TopSpeedWindowing.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/TopSpeedWindowing.scala new file mode 100644 index 0000000..10ddfd8 --- /dev/null +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/TopSpeedWindowing.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.examples.scala.streaming.windowing + + +import java.util.concurrent.TimeUnit._ +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment +import org.apache.flink.api.scala.streaming.windowing.Delta +import org.apache.flink.streaming.api.windowing.helper.Time +import org.apache.flink.util.Collector + +import scala.math.{max, min} +import scala.util.Random + +/** + * An example of grouped stream windowing where different eviction and trigger policies can be used. + * A source fetches events from cars every 1 sec containing their id, their current speed (kmh), + * overall elapsed distance (m) and a timestamp. The streaming + * example triggers the top speed of each car every x meters elapsed for the last y seconds. + */ +object TopSpeedWindowing { + + case class CarSpeed(carId: Int, speed: Int, distance: Double, time: Long) + + def main(args: Array[String]) { + if (!parseParameters(args)) { + return + } + + val env = StreamExecutionEnvironment.getExecutionEnvironment + val cars = env.addSource(carSource _).groupBy("carId") + .window(Time.of(evictionSec, SECONDS)) + .every(Delta.of[CarSpeed](triggerMeters, (oldSp,newSp) => newSp.distance-oldSp.distance, CarSpeed(0,0,0,0))) + .reduce((x, y) => if (x.speed > y.speed) x else y) + + cars print + + env.execute("TopSpeedWindowing") + + } + + def carSource(out: Collector[CarSpeed]) = { + + val speeds = new Array[Int](numOfCars) + val distances = new Array[Double](numOfCars) + + while (true) { + Thread sleep 1000 + for (i <- 0 until speeds.length) { + speeds(i) = if (Random.nextBoolean) min(100, speeds(i) + 5) else max(0, speeds(i) - 5) + distances(i) += speeds(i) / 3.6d + out.collect(new CarSpeed(i, speeds(i), distances(i), System.currentTimeMillis)) + } + } + } + + def parseParameters(args: Array[String]): Boolean = { + if (args.length > 0) { + if (args.length == 3) { + numOfCars = args(0).toInt + evictionSec = args(1).toInt + triggerMeters = args(2).toDouble + } + else { + System.err.println("Usage: TopSpeedWindowing ") + false + } + } + true + } + + var numOfCars = 2 + var evictionSec = 10 + var triggerMeters = 50d + +}