flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [1/3] flink git commit: [streaming] Java Stock streaming example added
Date Tue, 10 Feb 2015 07:51:58 GMT
Repository: flink
Updated Branches:
  refs/heads/master bfb611f3e -> 6b402f43d


[streaming] Java Stock streaming example added


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b5752a76
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b5752a76
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b5752a76

Branch: refs/heads/master
Commit: b5752a76c4b835931131b48c89734d44fdee5dee
Parents: 2afc9ec
Author: mbalassi <mbalassi@apache.org>
Authored: Fri Feb 6 11:31:20 2015 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Mon Feb 9 21:55:09 2015 +0100

----------------------------------------------------------------------
 .../scala/examples/windowing/StockPrices.scala  | 158 ---------
 .../examples/windowing/StockPrices.java         | 341 +++++++++++++++++++
 .../scala/examples/windowing/StockPrices.scala  | 166 +++++++++
 3 files changed, 507 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b5752a76/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
deleted file mode 100644
index 0fee767..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.scala.examples.windowing
-
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.api.windowing.helper.Count
-import org.apache.flink.util.Collector
-import org.apache.flink.streaming.api.windowing.helper.Time
-import java.util.concurrent.TimeUnit._
-import org.apache.flink.streaming.api.scala.windowing.Delta
-import scala.util.Random
-
-object StockPrices {
-
-  case class StockPrice(symbol: String, price: Double)
-  case class Count(symbol: String, count: Int)
-
-  val symbols = List("SPX", "FTSE", "DJI", "DJT", "BUX", "DAX", "GOOG")
-
-  val defaultPrice = StockPrice("", 1000)
-
-  def main(args: Array[String]) {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-    //Step 1 
-    //Read a stream of stock prices from different sources and merge it into one stream
-
-    //Read from a socket stream at map it to StockPrice objects
-    val socketStockStream = env.socketTextStream("localhost", 9999).map(x => {
-      val split = x.split(",")
-      StockPrice(split(0), split(1).toDouble)
-    })
-
-    //Generate other stock streams
-    val SPX_Stream = env.addSource(generateStock("SPX")(10) _)
-    val FTSE_Stream = env.addSource(generateStock("FTSE")(20) _)
-    val DJI_Stream = env.addSource(generateStock("DJI")(30) _)
-    val BUX_Stream = env.addSource(generateStock("BUX")(40) _)
-
-    //Merge all stock streams together
-    val stockStream = socketStockStream.merge(SPX_Stream, FTSE_Stream, DJI_Stream, BUX_Stream)
-
-    //Step 2
-    //Compute some simple statistics on a rolling window
-    val windowedStream = stockStream.window(Time.of(10, SECONDS)).every(Time.of(5, SECONDS))
-
-    val lowest = windowedStream.minBy("price").setParallelism(1)
-    val maxByStock = windowedStream.groupBy("symbol").maxBy("price")
-    val rollingMean = windowedStream.groupBy("symbol").reduceGroup(mean _)
-
-    //Step 3 
-    //Use  delta policy to create price change warnings, and also count the number of warning
every half minute
-
-    val priceWarnings = stockStream.groupBy("symbol")
-      .window(Delta.of(0.05, priceChange, defaultPrice))
-      .reduceGroup(sendWarning _)
-
-    val warningsPerStock = priceWarnings.map(Count(_, 1))
-      .groupBy("symbol")
-      .window(Time.of(30, SECONDS))
-      .sum("count")
-
-    //Step 4 
-    //Read a stream of tweets and extract the stock symbols
-
-    val tweetStream = env.addSource(generateTweets _)
-
-    val mentionedSymbols = tweetStream.flatMap(
-      tweet => for (word <- tweet.split(" ").map(_.toUpperCase()) if symbols.contains(word))
yield word)
-
-    val tweetsPerStock = mentionedSymbols.map(Count(_, 1))
-      .groupBy("symbol")
-      .window(Time.of(30, SECONDS))
-      .sum("count")
-
-    //Step 5
-    //For advanced analysis we join the number of tweets and the number of price change warnings
by stock
-    //for the last half minute, we keep only the counts. We use this information to compute
rolling correlations
-    //between the tweets and the price changes                              
-
-    val tweetsAndWarning = warningsPerStock.join(tweetsPerStock)
-      .onWindow(30, SECONDS)
-      .where("symbol")
-      .equalTo("symbol") { (c1, c2) => (c1.count, c2.count) }
-
-    val rollingCorrelation = tweetsAndWarning.window(Time.of(30, SECONDS)).reduceGroup(computeCorrelation
_).setParallelism(1)
-
-    rollingCorrelation.print
-
-    env.execute("Stock stream")
-  }
-
-  def priceChange(p1: StockPrice, p2: StockPrice): Double = {
-    Math.abs(p1.price / p2.price - 1)
-  }
-
-  def mean(ts: Iterable[StockPrice], out: Collector[StockPrice]) = {
-    if (ts.nonEmpty) out.collect(StockPrice(ts.head.symbol, ts.foldLeft(0: Double)(_ + _.price)
/ ts.size))
-  }
-
-  def sendWarning(ts: Iterable[StockPrice], out: Collector[String]) = {
-    if (ts.nonEmpty) out.collect(ts.head.symbol)
-  }
-
-  def computeCorrelation(input: Iterable[(Int, Int)], out: Collector[Double]) = {
-    if (input.nonEmpty) {
-      val var1 = input.map(_._1)
-      val mean1 = average(var1)
-      val var2 = input.map(_._2)
-      val mean2 = average(var2)
-
-      val cov = average(var1.zip(var2).map(xy => (xy._1 - mean1) * (xy._2 - mean2)))
-      val d1 = Math.sqrt(average(var1.map(x => Math.pow((x - mean1), 2))))
-      val d2 = Math.sqrt(average(var2.map(x => Math.pow((x - mean2), 2))))
-
-      out.collect(cov / (d1 * d2))
-    }
-  }
-
-  def generateStock(symbol: String)(sigma: Int)(out: Collector[StockPrice]) = {
-    var price = 1000.
-    while (true) {
-      price = price + Random.nextGaussian * sigma
-      out.collect(StockPrice(symbol, price))
-      Thread.sleep(Random.nextInt(200))
-    }
-  }
-
-  def average[T](ts: Iterable[T])(implicit num: Numeric[T]) = {
-    num.toDouble(ts.sum) / ts.size
-  }
-
-  def generateTweets(out: Collector[String]) = {
-    while (true) {
-      val s = for (i <- 1 to 3) yield (symbols(Random.nextInt(symbols.size)))
-      out.collect(s.mkString(" "))
-      Thread.sleep(Random.nextInt(500))
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5752a76/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
new file mode 100644
index 0000000..c60b5ca
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.windowing;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.WindowedDataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
+import org.apache.flink.streaming.api.windowing.helper.Delta;
+import org.apache.flink.streaming.api.windowing.helper.Time;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+public class StockPrices {
+
+	private static final ArrayList<String> SYMBOLS = new ArrayList<String>(Arrays.asList("SPX",
"FTSE", "DJI", "DJT", "BUX", "DAX", "GOOG"));
+	private static final Double DEFAULT_PRICE = 1000.;
+	private static final StockPrice DEFAULT_STOCK_PRICE = new StockPrice("", DEFAULT_PRICE);
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(String[] args) throws Exception {
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		//Step 1 
+	    //Read a stream of stock prices from different sources and merge it into one stream
+		
+		//Read from a socket stream at map it to StockPrice objects
+		DataStream<StockPrice> socketStockStream = env.socketTextStream("localhost", 9999)
+				.map(new MapFunction<String, StockPrice>() {
+					private String[] tokens;
+
+					@Override
+					public StockPrice map(String value) throws Exception {
+						tokens = value.split(",");
+						return new StockPrice(tokens[0], Double.parseDouble(tokens[1]));
+					}
+				});
+
+		//Generate other stock streams
+		DataStream<StockPrice> SPX_stream = env.addSource(new StockSource("SPX", 10));
+		DataStream<StockPrice> FTSE_stream = env.addSource(new StockSource("FTSE", 20));
+		DataStream<StockPrice> DJI_stream = env.addSource(new StockSource("DJI", 30));
+		DataStream<StockPrice> BUX_stream = env.addSource(new StockSource("BUX", 40));
+
+		//Merge all stock streams together
+		DataStream<StockPrice> stockStream = socketStockStream.merge(SPX_stream, FTSE_stream,
DJI_stream, BUX_stream);
+		
+		//Step 2
+	    //Compute some simple statistics on a rolling window
+		WindowedDataStream<StockPrice> windowedStream = stockStream
+				.window(Time.of(10, TimeUnit.SECONDS))
+				.every(Time.of(5, TimeUnit.SECONDS));
+
+		DataStream<StockPrice> lowest = windowedStream.minBy("price").setParallelism(1);
+		DataStream<StockPrice> maxByStock = windowedStream.groupBy("symbol").maxBy("price");
+		DataStream<StockPrice> rollingMean = windowedStream.groupBy("symbol").reduceGroup(new
MeanReduce());
+
+		//Step 3
+		//Use  delta policy to create price change warnings, and also count the number of warning
every half minute
+
+		DataStream<String> priceWarnings = stockStream.groupBy("symbol")
+				.window(Delta.of(0.05, new DeltaFunction<StockPrice>() {
+					@Override
+					public double getDelta(StockPrice oldDataPoint, StockPrice newDataPoint) {
+						return Math.abs(oldDataPoint.price - newDataPoint.price);
+					}
+				}, DEFAULT_STOCK_PRICE))
+				.reduceGroup(new SendWarning());
+
+
+		DataStream<Count> warningsPerStock = priceWarnings.map(new MapFunction<String,
Count>() {
+			@Override
+			public Count map(String value) throws Exception {
+				return new Count(value, 1);
+			}
+		}).groupBy("symbol").window(Time.of(30, TimeUnit.SECONDS)).sum("count");
+
+		//Step 4
+		//Read a stream of tweets and extract the stock symbols
+		DataStream<String> tweetStream = env.addSource(new TweetSource());
+
+		DataStream<String> mentionedSymbols = tweetStream.flatMap(new FlatMapFunction<String,
String>() {
+			@Override
+			public void flatMap(String value, Collector<String> out) throws Exception {
+				String[] words = value.split(" ");
+				for (String word : words) {
+					out.collect(word.toUpperCase());
+				}
+			}
+		}).filter(new FilterFunction<String>() {
+			@Override
+			public boolean filter(String value) throws Exception {
+				return SYMBOLS.contains(value);
+			}
+		});
+
+		DataStream<Count> tweetsPerStock = mentionedSymbols.map(new MapFunction<String,
Count>() {
+			@Override
+			public Count map(String value) throws Exception {
+				return new Count(value, 1);
+			}
+		}).groupBy("symbol")
+				.window(Time.of(30, TimeUnit.SECONDS))
+				.sum("count");
+
+		//Step 5
+		//For advanced analysis we join the number of tweets and the number of price change warnings
by stock
+		//for the last half minute, we keep only the counts. We use this information to compute
rolling correlations
+		//between the tweets and the price changes
+
+		DataStream<Tuple2<Integer, Integer>> tweetsAndWarning = warningsPerStock.join(tweetsPerStock)
+				.onWindow(30, TimeUnit.SECONDS)
+				.where("symbol")
+				.equalTo("symbol")
+				.with(new JoinFunction<Count, Count, Tuple2<Integer, Integer>>() {
+					@Override
+					public Tuple2<Integer, Integer> join(Count first, Count second) throws Exception
{
+						return new Tuple2<Integer, Integer>(first.count, second.count);
+					}
+				});
+
+		DataStream<Double> rollingCorrelation = tweetsAndWarning
+				.window(Time.of(30, TimeUnit.SECONDS))
+				.reduceGroup(new CorrelationReduce())
+				.setParallelism(1);
+
+		rollingCorrelation.print();
+
+		env.execute("Stock stream");
+
+	}
+
+	// *************************************************************************
+	// DATA TYPES
+	// *************************************************************************
+
+	public static class StockPrice implements Serializable {
+
+		public String symbol;
+		public Double price;
+
+		public StockPrice() {
+		}
+
+		public StockPrice(String symbol, Double price) {
+			this.symbol = symbol;
+			this.price = price;
+		}
+
+		@Override
+		public String toString() {
+			return "StockPrice{" +
+					"symbol='" + symbol + '\'' +
+					", count=" + price +
+					'}';
+		}
+	}
+
+	public static class Count implements Serializable {
+		public String symbol;
+		public Integer count;
+
+		public Count() {
+		}
+
+		public Count(String symbol, Integer count) {
+			this.symbol = symbol;
+			this.count = count;
+		}
+
+		@Override
+		public String toString() {
+			return "Count{" +
+					"symbol='" + symbol + '\'' +
+					", count=" + count +
+					'}';
+		}
+	}
+
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
+	public final static class StockSource implements SourceFunction<StockPrice> {
+
+		private Double price;
+		private String symbol;
+		private Integer sigma;
+
+		public StockSource(String symbol, Integer sigma) {
+			this.symbol = symbol;
+			this.sigma = sigma;
+		}
+
+		@Override
+		public void invoke(Collector<StockPrice> collector) throws Exception {
+			price = DEFAULT_PRICE;
+			Random random = new Random();
+
+			while (true) {
+				price = price + random.nextGaussian() * sigma;
+				collector.collect(new StockPrice(symbol, price));
+				Thread.sleep(random.nextInt(200));
+			}
+		}
+	}
+
+	public final static class MeanReduce implements GroupReduceFunction<StockPrice, StockPrice>
{
+
+		private Double sum = 0.0;
+		private Integer count = 0;
+		private String symbol = "";
+
+		@Override
+		public void reduce(Iterable<StockPrice> values, Collector<StockPrice> out)
throws Exception {
+			if (values.iterator().hasNext()) {
+
+				for (StockPrice sp : values) {
+					sum += sp.price;
+					symbol = sp.symbol;
+					count++;
+				}
+				out.collect(new StockPrice(symbol, sum / count));
+			}
+		}
+	}
+
+	public static final class TweetSource implements SourceFunction<String> {
+
+		Random random;
+		StringBuilder stringBuilder;
+
+		@Override
+		public void invoke(Collector<String> collector) throws Exception {
+			random = new Random();
+			stringBuilder = new StringBuilder();
+
+			while (true) {
+				stringBuilder.setLength(0);
+				for (int i = 0; i < 3; i++) {
+					stringBuilder.append(" ");
+					stringBuilder.append(SYMBOLS.get(random.nextInt(SYMBOLS.size())));
+				}
+				collector.collect(stringBuilder.toString());
+				Thread.sleep(500);
+			}
+
+		}
+	}
+
+	public static final class SendWarning implements GroupReduceFunction<StockPrice, String>
{
+		@Override
+		public void reduce(Iterable<StockPrice> values, Collector<String> out) throws
Exception {
+			if (values.iterator().hasNext()) {
+				out.collect(values.iterator().next().symbol);
+			}
+		}
+	}
+
+	public static final class CorrelationReduce implements GroupReduceFunction<Tuple2<Integer,
Integer>, Double> {
+
+		private Integer leftSum;
+		private Integer rightSum;
+		private Integer count;
+
+		private Double leftMean;
+		private Double rightMean;
+
+		private Double cov;
+		private Double leftSd;
+		private Double rightSd;
+
+		@Override
+		public void reduce(Iterable<Tuple2<Integer, Integer>> values, Collector<Double>
out) throws Exception {
+
+			leftSum = 0;
+			rightSum = 0;
+			count = 0;
+
+			cov = 0.;
+			leftSd = 0.;
+			rightSd = 0.;
+
+			//compute mean for both sides, save count
+			for (Tuple2<Integer, Integer> pair : values) {
+				leftSum += pair.f0;
+				rightSum += pair.f1;
+				count++;
+			}
+
+			leftMean = leftSum.doubleValue() / count;
+			rightMean = rightSum.doubleValue() / count;
+
+			//compute covariance & std. deviations
+			for (Tuple2<Integer, Integer> pair : values) {
+				cov += (pair.f0 - leftMean) * (pair.f1 - rightMean) / count;
+			}
+
+			for (Tuple2<Integer, Integer> pair : values) {
+				leftSd += Math.pow(pair.f0 - leftMean, 2) / count;
+				rightSd += Math.pow(pair.f1 - rightMean, 2) / count;
+			}
+			leftSd = Math.sqrt(leftSd);
+			rightSd = Math.sqrt(rightSd);
+
+			out.collect(cov / (leftSd * rightSd));
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5752a76/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
new file mode 100644
index 0000000..f357fe7
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.windowing
+
+import java.util.concurrent.TimeUnit._
+
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.api.scala.windowing.Delta
+import org.apache.flink.streaming.api.windowing.helper.Time
+import org.apache.flink.util.Collector
+
+import scala.util.Random
+
+object StockPrices {
+
+  case class StockPrice(symbol: String, price: Double)
+  case class Count(symbol: String, count: Int)
+
+  val symbols = List("SPX", "FTSE", "DJI", "DJT", "BUX", "DAX", "GOOG")
+
+  val defaultPrice = StockPrice("", 1000)
+
+  def main(args: Array[String]) {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    //Step 1 
+    //Read a stream of stock prices from different sources and merge it into one stream
+
+    //Read from a socket stream at map it to StockPrice objects
+    val socketStockStream = env.socketTextStream("localhost", 9999).map(x => {
+      val split = x.split(",")
+      StockPrice(split(0), split(1).toDouble)
+    })
+
+    //Generate other stock streams
+    val SPX_Stream = env.addSource(generateStock("SPX")(10) _)
+    val FTSE_Stream = env.addSource(generateStock("FTSE")(20) _)
+    val DJI_Stream = env.addSource(generateStock("DJI")(30) _)
+    val BUX_Stream = env.addSource(generateStock("BUX")(40) _)
+
+    //Merge all stock streams together
+    val stockStream = socketStockStream.merge(SPX_Stream, FTSE_Stream, DJI_Stream, BUX_Stream)
+
+    //Step 2
+    //Compute some simple statistics on a rolling window
+    val windowedStream = stockStream.window(Time.of(10, SECONDS)).every(Time.of(5, SECONDS))
+
+    val lowest = windowedStream.minBy("price").setParallelism(1)
+    val maxByStock = windowedStream.groupBy("symbol").maxBy("price")
+    val rollingMean = windowedStream.groupBy("symbol").reduceGroup(mean _)
+
+    //Step 3 
+    //Use  delta policy to create price change warnings,
+    // and also count the number of warning every half minute
+
+    val priceWarnings = stockStream.groupBy("symbol")
+      .window(Delta.of(0.05, priceChange, defaultPrice))
+      .reduceGroup(sendWarning _)
+
+    val warningsPerStock = priceWarnings.map(Count(_, 1))
+      .groupBy("symbol")
+      .window(Time.of(30, SECONDS))
+      .sum("count")
+
+    //Step 4 
+    //Read a stream of tweets and extract the stock symbols
+
+    val tweetStream = env.addSource(generateTweets _)
+
+    val mentionedSymbols = tweetStream.flatMap(tweet => tweet.split(" "))
+      .map(_.toUpperCase())
+      .filter(symbols.contains(_))                     
+                    
+    val tweetsPerStock = mentionedSymbols.map(Count(_, 1))
+      .groupBy("symbol")
+      .window(Time.of(30, SECONDS))
+      .sum("count")
+
+    //Step 5
+    //For advanced analysis we join the number of tweets and
+    //the number of price change warnings by stock
+    //for the last half minute, we keep only the counts.
+    //This information is used to compute rolling correlations
+    //between the tweets and the price changes                              
+
+    val tweetsAndWarning = warningsPerStock.join(tweetsPerStock)
+      .onWindow(30, SECONDS)
+      .where("symbol")
+      .equalTo("symbol") { (c1, c2) => (c1.count, c2.count) }
+
+    val rollingCorrelation = tweetsAndWarning.window(Time.of(30, SECONDS))
+      .reduceGroup(computeCorrelation _).setParallelism(1)
+
+    rollingCorrelation.print
+
+    env.execute("Stock stream")
+  }
+
+  def priceChange(p1: StockPrice, p2: StockPrice): Double = {
+    Math.abs(p1.price / p2.price - 1)
+  }
+
+  def mean(ts: Iterable[StockPrice], out: Collector[StockPrice]) = {
+    if (ts.nonEmpty) {
+      out.collect(StockPrice(ts.head.symbol, ts.foldLeft(0: Double)(_ + _.price) / ts.size))
+    }
+  }
+
+  def sendWarning(ts: Iterable[StockPrice], out: Collector[String]) = {
+    if (ts.nonEmpty) out.collect(ts.head.symbol)
+  }
+
+  def computeCorrelation(input: Iterable[(Int, Int)], out: Collector[Double]) = {
+    if (input.nonEmpty) {
+      val var1 = input.map(_._1)
+      val mean1 = average(var1)
+      val var2 = input.map(_._2)
+      val mean2 = average(var2)
+
+      val cov = average(var1.zip(var2).map(xy => (xy._1 - mean1) * (xy._2 - mean2)))
+      val d1 = Math.sqrt(average(var1.map(x => Math.pow((x - mean1), 2))))
+      val d2 = Math.sqrt(average(var2.map(x => Math.pow((x - mean2), 2))))
+
+      out.collect(cov / (d1 * d2))
+    }
+  }
+
+  def generateStock(symbol: String)(sigma: Int)(out: Collector[StockPrice]) = {
+    var price = 1000.
+    while (true) {
+      price = price + Random.nextGaussian * sigma
+      out.collect(StockPrice(symbol, price))
+      Thread.sleep(Random.nextInt(200))
+    }
+  }
+
+  def average[T](ts: Iterable[T])(implicit num: Numeric[T]) = {
+    num.toDouble(ts.sum) / ts.size
+  }
+
+  def generateTweets(out: Collector[String]) = {
+    while (true) {
+      val s = for (i <- 1 to 3) yield (symbols(Random.nextInt(symbols.size)))
+      out.collect(s.mkString(" "))
+      Thread.sleep(Random.nextInt(500))
+    }
+  }
+
+}


Mime
View raw message