flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [2/3] flink git commit: [streaming] Stock streaming example added
Date Tue, 10 Feb 2015 07:51:59 GMT
[streaming] Stock streaming example added


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2afc9ec3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2afc9ec3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2afc9ec3

Branch: refs/heads/master
Commit: 2afc9ec351a47e31c091cba6389fb1d0ddae946e
Parents: bfb611f
Author: Gyula Fora <gyfora@apache.org>
Authored: Wed Jan 21 14:56:52 2015 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Mon Feb 9 21:55:09 2015 +0100

----------------------------------------------------------------------
 .../scala/examples/windowing/StockPrices.scala  | 158 +++++++++++++++++++
 1 file changed, 158 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2afc9ec3/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
new file mode 100644
index 0000000..0fee767
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.windowing
+
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.api.windowing.helper.Count
+import org.apache.flink.util.Collector
+import org.apache.flink.streaming.api.windowing.helper.Time
+import java.util.concurrent.TimeUnit._
+import org.apache.flink.streaming.api.scala.windowing.Delta
+import scala.util.Random
+
+object StockPrices {
+
+  case class StockPrice(symbol: String, price: Double)
+  case class Count(symbol: String, count: Int)
+
+  val symbols = List("SPX", "FTSE", "DJI", "DJT", "BUX", "DAX", "GOOG")
+
+  val defaultPrice = StockPrice("", 1000)
+
+  def main(args: Array[String]) {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    //Step 1 
+    //Read a stream of stock prices from different sources and merge it into one stream
+
+    //Read from a socket stream at map it to StockPrice objects
+    val socketStockStream = env.socketTextStream("localhost", 9999).map(x => {
+      val split = x.split(",")
+      StockPrice(split(0), split(1).toDouble)
+    })
+
+    //Generate other stock streams
+    val SPX_Stream = env.addSource(generateStock("SPX")(10) _)
+    val FTSE_Stream = env.addSource(generateStock("FTSE")(20) _)
+    val DJI_Stream = env.addSource(generateStock("DJI")(30) _)
+    val BUX_Stream = env.addSource(generateStock("BUX")(40) _)
+
+    //Merge all stock streams together
+    val stockStream = socketStockStream.merge(SPX_Stream, FTSE_Stream, DJI_Stream, BUX_Stream)
+
+    //Step 2
+    //Compute some simple statistics on a rolling window
+    val windowedStream = stockStream.window(Time.of(10, SECONDS)).every(Time.of(5, SECONDS))
+
+    val lowest = windowedStream.minBy("price").setParallelism(1)
+    val maxByStock = windowedStream.groupBy("symbol").maxBy("price")
+    val rollingMean = windowedStream.groupBy("symbol").reduceGroup(mean _)
+
+    //Step 3 
+    //Use  delta policy to create price change warnings, and also count the number of warning
every half minute
+
+    val priceWarnings = stockStream.groupBy("symbol")
+      .window(Delta.of(0.05, priceChange, defaultPrice))
+      .reduceGroup(sendWarning _)
+
+    val warningsPerStock = priceWarnings.map(Count(_, 1))
+      .groupBy("symbol")
+      .window(Time.of(30, SECONDS))
+      .sum("count")
+
+    //Step 4 
+    //Read a stream of tweets and extract the stock symbols
+
+    val tweetStream = env.addSource(generateTweets _)
+
+    val mentionedSymbols = tweetStream.flatMap(
+      tweet => for (word <- tweet.split(" ").map(_.toUpperCase()) if symbols.contains(word))
yield word)
+
+    val tweetsPerStock = mentionedSymbols.map(Count(_, 1))
+      .groupBy("symbol")
+      .window(Time.of(30, SECONDS))
+      .sum("count")
+
+    //Step 5
+    //For advanced analysis we join the number of tweets and the number of price change warnings
by stock
+    //for the last half minute, we keep only the counts. We use this information to compute
rolling correlations
+    //between the tweets and the price changes                              
+
+    val tweetsAndWarning = warningsPerStock.join(tweetsPerStock)
+      .onWindow(30, SECONDS)
+      .where("symbol")
+      .equalTo("symbol") { (c1, c2) => (c1.count, c2.count) }
+
+    val rollingCorrelation = tweetsAndWarning.window(Time.of(30, SECONDS)).reduceGroup(computeCorrelation
_).setParallelism(1)
+
+    rollingCorrelation.print
+
+    env.execute("Stock stream")
+  }
+
+  def priceChange(p1: StockPrice, p2: StockPrice): Double = {
+    Math.abs(p1.price / p2.price - 1)
+  }
+
+  def mean(ts: Iterable[StockPrice], out: Collector[StockPrice]) = {
+    if (ts.nonEmpty) out.collect(StockPrice(ts.head.symbol, ts.foldLeft(0: Double)(_ + _.price)
/ ts.size))
+  }
+
+  def sendWarning(ts: Iterable[StockPrice], out: Collector[String]) = {
+    if (ts.nonEmpty) out.collect(ts.head.symbol)
+  }
+
+  def computeCorrelation(input: Iterable[(Int, Int)], out: Collector[Double]) = {
+    if (input.nonEmpty) {
+      val var1 = input.map(_._1)
+      val mean1 = average(var1)
+      val var2 = input.map(_._2)
+      val mean2 = average(var2)
+
+      val cov = average(var1.zip(var2).map(xy => (xy._1 - mean1) * (xy._2 - mean2)))
+      val d1 = Math.sqrt(average(var1.map(x => Math.pow((x - mean1), 2))))
+      val d2 = Math.sqrt(average(var2.map(x => Math.pow((x - mean2), 2))))
+
+      out.collect(cov / (d1 * d2))
+    }
+  }
+
+  def generateStock(symbol: String)(sigma: Int)(out: Collector[StockPrice]) = {
+    var price = 1000.
+    while (true) {
+      price = price + Random.nextGaussian * sigma
+      out.collect(StockPrice(symbol, price))
+      Thread.sleep(Random.nextInt(200))
+    }
+  }
+
+  def average[T](ts: Iterable[T])(implicit num: Numeric[T]) = {
+    num.toDouble(ts.sum) / ts.size
+  }
+
+  def generateTweets(out: Collector[String]) = {
+    while (true) {
+      val s = for (i <- 1 to 3) yield (symbols(Random.nextInt(symbols.size)))
+      out.collect(s.mkString(" "))
+      Thread.sleep(Random.nextInt(500))
+    }
+  }
+
+}


Mime
View raw message