Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BF53117FFF for ; Thu, 8 Jan 2015 14:54:20 +0000 (UTC) Received: (qmail 3496 invoked by uid 500); 8 Jan 2015 14:54:22 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 3453 invoked by uid 500); 8 Jan 2015 14:54:22 -0000 Mailing-List: contact commits-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.incubator.apache.org Delivered-To: mailing list commits@flink.incubator.apache.org Received: (qmail 3440 invoked by uid 99); 8 Jan 2015 14:54:21 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Jan 2015 14:54:21 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id B30C4319CB9; Thu, 8 Jan 2015 14:54:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mbalassi@apache.org To: commits@flink.apache.org Date: Thu, 08 Jan 2015 14:54:21 -0000 Message-Id: <452662185f6f499e91a6d74bf3c8e6ef@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/3] flink git commit: [FLINK-1367] [scala] [streaming] Field aggregations added to streaming scala api Repository: flink Updated Branches: refs/heads/master 19066b520 -> 06503c8fd [FLINK-1367] [scala] [streaming] Field aggregations added to streaming scala api Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/06503c8f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/06503c8f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/06503c8f Branch: refs/heads/master Commit: 06503c8fd3506fb83bd09392856c465f8d095736 Parents: 10a8186 Author: Gyula Fora Authored: Wed Jan 7 16:30:09 2015 +0100 Committer: mbalassi Committed: Thu Jan 8 13:35:28 2015 +0100 ---------------------------------------------------------------------- .../examples/windowing/TopSpeedWindowing.scala | 2 +- .../flink/streaming/api/scala/DataStream.scala | 65 ++++++++++++++++---- .../api/scala/WindowedDataStream.scala | 53 ++++++++++++++-- .../flink/streaming/api/scala/package.scala | 20 ++++++ 4 files changed, 120 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/06503c8f/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala index e3ef95e..8264b7f 100644 --- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala +++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala @@ -50,7 +50,7 @@ object TopSpeedWindowing { .window(Time.of(evictionSec, SECONDS)) .every(Delta.of[CarEvent](triggerMeters, (oldSp,newSp) => newSp.distance-oldSp.distance, CarEvent(0,0,0,0))) - .reduce((x, y) => if (x.speed > y.speed) x else y) + .maxBy("speed") cars print http://git-wip-us.apache.org/repos/asf/flink/blob/06503c8f/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index dfaa316..f0ec78f 100644 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -138,7 +138,7 @@ class DataStream[T](javaStream: JavaStream[T]) { /** * Sets the partitioning of the DataStream so that the output values all go to * the first instance of the next processing operator. Use this setting with care - * since it might cause a serious performance bottlenect in the application. + * since it might cause a serious performance bottleneck in the application. */ def global: DataStream[T] = javaStream.global() @@ -203,39 +203,78 @@ class DataStream[T](javaStream: JavaStream[T]) { * */ def max(position: Int): DataStream[T] = aggregate(AggregationType.MAX, position) - + + /** + * Applies an aggregation that that gives the current maximum of the data stream at + * the given field. + * + */ + def max(field: String): DataStream[T] = aggregate(AggregationType.MAX, field) + /** * Applies an aggregation that that gives the current minimum of the data stream at * the given position. * */ def min(position: Int): DataStream[T] = aggregate(AggregationType.MIN, position) + + /** + * Applies an aggregation that that gives the current minimum of the data stream at + * the given field. + * + */ + def min(field: String): DataStream[T] = aggregate(AggregationType.MIN, field) /** * Applies an aggregation that sums the data stream at the given position. * */ def sum(position: Int): DataStream[T] = aggregate(AggregationType.SUM, position) + + /** + * Applies an aggregation that sums the data stream at the given field. + * + */ + def sum(field: String): DataStream[T] = aggregate(AggregationType.SUM, field) /** * Applies an aggregation that that gives the current minimum element of the data stream by - * the given position. When equality, the user can set to get the first or last element with - * the minimal value. + * the given position. When equality, the first element is returned with the minimal value. + * + */ + def minBy(position: Int): DataStream[T] = aggregate(AggregationType + .MINBY, position) + + /** + * Applies an aggregation that that gives the current minimum element of the data stream by + * the given field. When equality, the first element is returned with the minimal value. * */ - def minBy(position: Int, first: Boolean = true): DataStream[T] = aggregate(AggregationType - .MINBY, position, first) + def minBy(field: String): DataStream[T] = aggregate(AggregationType + .MINBY, field ) - /** + /** + * Applies an aggregation that that gives the current maximum element of the data stream by + * the given position. When equality, the first element is returned with the maximal value. + * + */ + def maxBy(position: Int): DataStream[T] = + aggregate(AggregationType.MAXBY, position) + + /** * Applies an aggregation that that gives the current maximum element of the data stream by - * the given position. When equality, the user can set to get the first or last element with - * the maximal value. + * the given field. When equality, the first element is returned with the maximal value. * */ - def maxBy(position: Int, first: Boolean = true): DataStream[T] = - aggregate(AggregationType.MAXBY, position, first) + def maxBy(field: String): DataStream[T] = + aggregate(AggregationType.MAXBY, field) + + private def aggregate(aggregationType: AggregationType, field: String): DataStream[T] = { + val position = fieldNames2Indices(javaStream.getType(), Array(field))(0) + aggregate(aggregationType, position) + } - private def aggregate(aggregationType: AggregationType, position: Int, first: Boolean = true): + private def aggregate(aggregationType: AggregationType, position: Int): DataStream[T] = { val jStream = javaStream.asInstanceOf[JavaStream[Product]] @@ -246,7 +285,7 @@ class DataStream[T](javaStream: JavaStream[T]) { val reducer = aggregationType match { case AggregationType.SUM => new agg.Sum(SumFunction.getForClass(outType.getTypeAt(position). getTypeClass())); - case _ => new agg.ProductComparableAggregator(aggregationType, first) + case _ => new agg.ProductComparableAggregator(aggregationType, true) } val invokable = jStream match { http://git-wip-us.apache.org/repos/asf/flink/blob/06503c8f/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala index 0b88137..1908939 100644 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala @@ -157,6 +157,13 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) { * */ def max(position: Int): DataStream[T] = aggregate(AggregationType.MAX, position) + + /** + * Applies an aggregation that that gives the maximum of the elements in the window at + * the given field. + * + */ + def max(field: String): DataStream[T] = aggregate(AggregationType.MAX, field) /** * Applies an aggregation that that gives the minimum of the elements in the window at @@ -164,30 +171,64 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) { * */ def min(position: Int): DataStream[T] = aggregate(AggregationType.MIN, position) + + /** + * Applies an aggregation that that gives the minimum of the elements in the window at + * the given field. + * + */ + def min(field: String): DataStream[T] = aggregate(AggregationType.MIN, field) /** * Applies an aggregation that sums the elements in the window at the given position. * */ def sum(position: Int): DataStream[T] = aggregate(AggregationType.SUM, position) + + /** + * Applies an aggregation that sums the elements in the window at the given field. + * + */ + def sum(field: String): DataStream[T] = aggregate(AggregationType.SUM, field) /** * Applies an aggregation that that gives the maximum element of the window by * the given position. When equality, returns the first. * */ - def maxBy(position: Int, first: Boolean = true): DataStream[T] = aggregate(AggregationType.MAXBY, - position, first) + def maxBy(position: Int): DataStream[T] = aggregate(AggregationType.MAXBY, + position) + + /** + * Applies an aggregation that that gives the maximum element of the window by + * the given field. When equality, returns the first. + * + */ + def maxBy(field: String): DataStream[T] = aggregate(AggregationType.MAXBY, + field) /** * Applies an aggregation that that gives the minimum element of the window by * the given position. When equality, returns the first. * */ - def minBy(position: Int, first: Boolean = true): DataStream[T] = aggregate(AggregationType.MINBY, - position, first) + def minBy(position: Int): DataStream[T] = aggregate(AggregationType.MINBY, + position) + + /** + * Applies an aggregation that that gives the minimum element of the window by + * the given field. When equality, returns the first. + * + */ + def minBy(field: String): DataStream[T] = aggregate(AggregationType.MINBY, + field) + + private def aggregate(aggregationType: AggregationType, field: String): DataStream[T] = { + val position = fieldNames2Indices(javaStream.getType(), Array(field))(0) + aggregate(aggregationType, position) + } - def aggregate(aggregationType: AggregationType, position: Int, first: Boolean = true): + def aggregate(aggregationType: AggregationType, position: Int): DataStream[T] = { val jStream = javaStream.asInstanceOf[JavaWStream[Product]] @@ -198,7 +239,7 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) { val reducer = aggregationType match { case AggregationType.SUM => new agg.Sum(SumFunction.getForClass( outType.getTypeAt(position).getTypeClass())); - case _ => new agg.ProductComparableAggregator(aggregationType, first) + case _ => new agg.ProductComparableAggregator(aggregationType, true) } new DataStream[Product](jStream.reduce(reducer)).asInstanceOf[DataStream[T]] http://git-wip-us.apache.org/repos/asf/flink/blob/06503c8f/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala index 3604b55..2af370b 100644 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala @@ -43,4 +43,24 @@ package object scala { implicit def javaToScalaConnectedStream[IN1, IN2](javaStream: JavaConStream[IN1, IN2]): ConnectedDataStream[IN1, IN2] = new ConnectedDataStream[IN1, IN2](javaStream) + + private[flink] def fieldNames2Indices( + typeInfo: TypeInformation[_], + fields: Array[String]): Array[Int] = { + typeInfo match { + case ti: CaseClassTypeInfo[_] => + val result = ti.getFieldIndices(fields) + + if (result.contains(-1)) { + throw new IllegalArgumentException("Fields '" + fields.mkString(", ") + + "' are not valid for '" + ti.toString + "'.") + } + + result + + case _ => + throw new UnsupportedOperationException("Specifying fields by name is only" + + "supported on Case Classes (for now).") + } + } }