Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C4752179BD for ; Wed, 7 Jan 2015 14:12:40 +0000 (UTC) Received: (qmail 74337 invoked by uid 500); 7 Jan 2015 14:12:41 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 74167 invoked by uid 500); 7 Jan 2015 14:12:41 -0000 Mailing-List: contact commits-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.incubator.apache.org Delivered-To: mailing list commits@flink.incubator.apache.org Received: (qmail 73759 invoked by uid 99); 7 Jan 2015 14:12:41 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Jan 2015 14:12:41 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 49772A26643; Wed, 7 Jan 2015 14:12:41 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mbalassi@apache.org To: commits@flink.apache.org Date: Wed, 07 Jan 2015 14:12:51 -0000 Message-Id: <26622a2f3c2f4a66a1738453f8c08070@git.apache.org> In-Reply-To: <08b7183474144e87804c03951f2e23b5@git.apache.org> References: <08b7183474144e87804c03951f2e23b5@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [12/36] flink git commit: [scala] [streaming] Fixed scala formatting [scala] [streaming] Fixed scala formatting Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1f7b6eaa Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1f7b6eaa Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1f7b6eaa Branch: refs/heads/release-0.8 Commit: 1f7b6eaaa979e91799a047f29881f82666c12b19 Parents: f7291ea Author: Gyula Fora Authored: Sun Dec 21 01:05:40 2014 +0100 Committer: mbalassi Committed: Mon Jan 5 17:58:54 2015 +0100 ---------------------------------------------------------------------- .../flink/api/scala/streaming/DataStream.scala | 93 +++++++++++++------- .../api/scala/streaming/FieldsKeySelector.scala | 2 +- .../api/scala/streaming/SplitDataStream.scala | 17 ++-- .../scala/streaming/StreamCrossOperator.scala | 25 ++++-- .../streaming/StreamExecutionEnvironment.scala | 16 ++-- .../scala/streaming/StreamJoinOperator.scala | 29 +++--- .../scala/streaming/WindowedDataStream.scala | 31 ++++--- 7 files changed, 136 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1f7b6eaa/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala index ecf5615..0cf4a60 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -82,7 +82,8 @@ class DataStream[T](javaStream: JavaStream[T]) { javaStream match { case ds: SingleOutputStreamOperator[_, _] => ds.setParallelism(dop) case _ => - throw new UnsupportedOperationException("Operator " + javaStream.toString + " cannot have " + + throw new UnsupportedOperationException("Operator " + javaStream.toString + " cannot " + + "have " + "parallelism.") } this @@ -94,7 +95,8 @@ class DataStream[T](javaStream: JavaStream[T]) { def getParallelism: Int = javaStream match { case op: SingleOutputStreamOperator[_, _] => op.getParallelism case _ => - throw new UnsupportedOperationException("Operator " + javaStream.toString + " does not have " + + throw new UnsupportedOperationException("Operator " + javaStream.toString + " does not have" + + " " + "parallelism.") } @@ -139,7 +141,8 @@ class DataStream[T](javaStream: JavaStream[T]) { /** * Sets the partitioning of the DataStream so that the output is - * partitioned by the selected fields. This setting only effects the how the outputs will be distributed between the parallel instances of the next processing operator. + * partitioned by the selected fields. This setting only effects the how the outputs will be + * distributed between the parallel instances of the next processing operator. * */ def partitionBy(fields: Int*): DataStream[T] = @@ -147,7 +150,8 @@ class DataStream[T](javaStream: JavaStream[T]) { /** * Sets the partitioning of the DataStream so that the output is - * partitioned by the selected fields. This setting only effects the how the outputs will be distributed between the parallel instances of the next processing operator. + * partitioned by the selected fields. This setting only effects the how the outputs will be + * distributed between the parallel instances of the next processing operator. * */ def partitionBy(firstField: String, otherFields: String*): DataStream[T] = @@ -155,7 +159,8 @@ class DataStream[T](javaStream: JavaStream[T]) { /** * Sets the partitioning of the DataStream so that the output is - * partitioned by the given Key. This setting only effects the how the outputs will be distributed between the parallel instances of the next processing operator. + * partitioned by the given Key. This setting only effects the how the outputs will be + * distributed between the parallel instances of the next processing operator. * */ def partitionBy[K: TypeInformation](fun: T => K): DataStream[T] = { @@ -222,7 +227,8 @@ class DataStream[T](javaStream: JavaStream[T]) { * * */ - def iterate(stepFunction: DataStream[T] => (DataStream[T], DataStream[T]), maxWaitTimeMillis: Long = 0): DataStream[T] = { + def iterate(stepFunction: DataStream[T] => (DataStream[T], DataStream[T]), maxWaitTimeMillis: + Long = 0): DataStream[T] = { val iterativeStream = javaStream.iterate(maxWaitTimeMillis) val (feedback, output) = stepFunction(new DataStream[T](iterativeStream)) @@ -252,19 +258,24 @@ class DataStream[T](javaStream: JavaStream[T]) { /** * Applies an aggregation that that gives the current minimum element of the data stream by - * the given position. When equality, the user can set to get the first or last element with the minimal value. + * the given position. When equality, the user can set to get the first or last element with + * the minimal value. * */ - def minBy(position: Int, first: Boolean = true): DataStream[T] = aggregate(AggregationType.MINBY, position, first) + def minBy(position: Int, first: Boolean = true): DataStream[T] = aggregate(AggregationType + .MINBY, position, first) /** * Applies an aggregation that that gives the current maximum element of the data stream by - * the given position. When equality, the user can set to get the first or last element with the maximal value. + * the given position. When equality, the user can set to get the first or last element with + * the maximal value. * */ - def maxBy(position: Int, first: Boolean = true): DataStream[T] = aggregate(AggregationType.MAXBY, position, first) + def maxBy(position: Int, first: Boolean = true): DataStream[T] = + aggregate(AggregationType.MAXBY, position, first) - private def aggregate(aggregationType: AggregationType, position: Int, first: Boolean = true): DataStream[T] = { + private def aggregate(aggregationType: AggregationType, position: Int, first: Boolean = true): + DataStream[T] = { val jStream = javaStream.asInstanceOf[JavaStream[Product]] val outType = jStream.getType().asInstanceOf[TupleTypeInfoBase[_]] @@ -272,15 +283,18 @@ class DataStream[T](javaStream: JavaStream[T]) { val agg = new ScalaStreamingAggregator[Product](jStream.getType().createSerializer(), position) val reducer = aggregationType match { - case AggregationType.SUM => new agg.Sum(SumFunction.getForClass(outType.getTypeAt(position).getTypeClass())); + case AggregationType.SUM => new agg.Sum(SumFunction.getForClass(outType.getTypeAt(position). + getTypeClass())); case _ => new agg.ProductComparableAggregator(aggregationType, first) } val invokable = jStream match { - case groupedStream: GroupedDataStream[_] => new GroupedReduceInvokable(reducer, groupedStream.getKeySelector()) + case groupedStream: GroupedDataStream[_] => new GroupedReduceInvokable(reducer, + groupedStream.getKeySelector()) case _ => new StreamReduceInvokable(reducer) } - new DataStream[Product](jStream.transform("aggregation", jStream.getType(), invokable)).asInstanceOf[DataStream[T]] + new DataStream[Product](jStream.transform("aggregation", jStream.getType(), + invokable)).asInstanceOf[DataStream[T]] } /** @@ -288,7 +302,8 @@ class DataStream[T](javaStream: JavaStream[T]) { * received records. * */ - def count: DataStream[Long] = new DataStream[java.lang.Long](javaStream.count()).asInstanceOf[DataStream[Long]] + def count: DataStream[Long] = new DataStream[java.lang.Long]( + javaStream.count()).asInstanceOf[DataStream[Long]] /** * Creates a new DataStream by applying the given function to every element of this DataStream. @@ -302,7 +317,8 @@ class DataStream[T](javaStream: JavaStream[T]) { def map(in: T): R = cleanFun(in) } - new DataStream(javaStream.transform("map", implicitly[TypeInformation[R]], new MapInvokable[T, R](mapper))) + new DataStream(javaStream.transform("map", implicitly[TypeInformation[R]], + new MapInvokable[T, R](mapper))) } /** @@ -313,7 +329,8 @@ class DataStream[T](javaStream: JavaStream[T]) { throw new NullPointerException("Map function must not be null.") } - new DataStream(javaStream.transform("map", implicitly[TypeInformation[R]], new MapInvokable[T, R](mapper))) + new DataStream(javaStream.transform("map", implicitly[TypeInformation[R]], + new MapInvokable[T, R](mapper))) } /** @@ -324,7 +341,8 @@ class DataStream[T](javaStream: JavaStream[T]) { if (flatMapper == null) { throw new NullPointerException("FlatMap function must not be null.") } - new DataStream[R](javaStream.transform("flatMap", implicitly[TypeInformation[R]], new FlatMapInvokable[T, R](flatMapper))) + new DataStream[R](javaStream.transform("flatMap", implicitly[TypeInformation[R]], + new FlatMapInvokable[T, R](flatMapper))) } /** @@ -358,22 +376,24 @@ class DataStream[T](javaStream: JavaStream[T]) { } /** - * Creates a new [[DataStream]] by reducing the elements of this DataStream using an associative reduce - * function. + * Creates a new [[DataStream]] by reducing the elements of this DataStream + * using an associative reduce function. */ def reduce(reducer: ReduceFunction[T]): DataStream[T] = { if (reducer == null) { throw new NullPointerException("Reduce function must not be null.") } javaStream match { - case ds: GroupedDataStream[_] => new DataStream[T](javaStream.transform("reduce", javaStream.getType(), new GroupedReduceInvokable[T](reducer, ds.getKeySelector()))) - case _ => new DataStream[T](javaStream.transform("reduce", javaStream.getType(), new StreamReduceInvokable[T](reducer))) + case ds: GroupedDataStream[_] => new DataStream[T](javaStream.transform("reduce", + javaStream.getType(), new GroupedReduceInvokable[T](reducer, ds.getKeySelector()))) + case _ => new DataStream[T](javaStream.transform("reduce", javaStream.getType(), + new StreamReduceInvokable[T](reducer))) } } /** - * Creates a new [[DataStream]] by reducing the elements of this DataStream using an associative reduce - * function. + * Creates a new [[DataStream]] by reducing the elements of this DataStream + * using an associative reduce function. */ def reduce(fun: (T, T) => T): DataStream[T] = { if (fun == null) { @@ -421,7 +441,8 @@ class DataStream[T](javaStream: JavaStream[T]) { * the trigger and eviction policies please use to * window(List(triggers), List(evicters)) */ - def window(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] = new WindowedDataStream[T](javaStream.window(windowingHelper: _*)) + def window(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] = + new WindowedDataStream[T](javaStream.window(windowingHelper: _*)) /** * Create a WindowedDataStream using the given TriggerPolicy-s and EvictionPolicy-s. @@ -430,7 +451,8 @@ class DataStream[T](javaStream: JavaStream[T]) { * use-cases please refer to window(WindowingHelper[_]*) * */ - def window(triggers: List[TriggerPolicy[T]], evicters: List[EvictionPolicy[T]]): WindowedDataStream[T] = new WindowedDataStream[T](javaStream.window(triggers, evicters)) + def window(triggers: List[TriggerPolicy[T]], evicters: List[EvictionPolicy[T]]): + WindowedDataStream[T] = new WindowedDataStream[T](javaStream.window(triggers, evicters)) /** * @@ -473,7 +495,8 @@ class DataStream[T](javaStream: JavaStream[T]) { * to use custom join function. * */ - def join[R](stream: DataStream[R]): StreamJoinOperator[T, R] = new StreamJoinOperator[T, R](javaStream, stream.getJavaStream) + def join[R](stream: DataStream[R]): StreamJoinOperator[T, R] = + new StreamJoinOperator[T, R](javaStream, stream.getJavaStream) /** * Initiates a temporal cross transformation that builds all pair @@ -487,7 +510,8 @@ class DataStream[T](javaStream: JavaStream[T]) { * to use custom join function. * */ - def cross[R](stream: DataStream[R]): StreamCrossOperator[T, R] = new StreamCrossOperator[T, R](javaStream, stream.getJavaStream) + def cross[R](stream: DataStream[R]): StreamCrossOperator[T, R] = + new StreamCrossOperator[T, R](javaStream, stream.getJavaStream) /** * Writes a DataStream to the standard output stream (stdout). For each @@ -504,7 +528,8 @@ class DataStream[T](javaStream: JavaStream[T]) { * is written. * */ - def writeAsText(path: String, millis: Long = 0): DataStream[T] = new DataStream[T](javaStream.writeAsText(path, millis)) + def writeAsText(path: String, millis: Long = 0): DataStream[T] = + new DataStream[T](javaStream.writeAsText(path, millis)) /** * Writes a DataStream to the file specified by path in text format. The @@ -513,7 +538,8 @@ class DataStream[T](javaStream: JavaStream[T]) { * is written. * */ - def writeAsCsv(path: String, millis: Long = 0): DataStream[T] = new DataStream[T](javaStream.writeAsCsv(path, millis)) + def writeAsCsv(path: String, millis: Long = 0): DataStream[T] = + new DataStream[T](javaStream.writeAsCsv(path, millis)) /** * Adds the given sink to this DataStream. Only streams with sinks added @@ -521,7 +547,8 @@ class DataStream[T](javaStream: JavaStream[T]) { * method is called. * */ - def addSink(sinkFuntion: SinkFunction[T]): DataStream[T] = new DataStream[T](javaStream.addSink(sinkFuntion)) + def addSink(sinkFuntion: SinkFunction[T]): DataStream[T] = + new DataStream[T](javaStream.addSink(sinkFuntion)) /** * Adds the given sink to this DataStream. Only streams with sinks added @@ -540,4 +567,4 @@ class DataStream[T](javaStream: JavaStream[T]) { this.addSink(sinkFunction) } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/1f7b6eaa/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/FieldsKeySelector.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/FieldsKeySelector.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/FieldsKeySelector.scala index d7c9f96..b50d346 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/FieldsKeySelector.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/FieldsKeySelector.scala @@ -44,4 +44,4 @@ class FieldsKeySelector[IN](fields: Int*) extends KeySelector[IN, Tuple] { case _ => throw new RuntimeException("Only tuple types are supported") } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/1f7b6eaa/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala index 0b0cce5..82a5c70 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala @@ -1,12 +1,13 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -46,4 +47,4 @@ class SplitDataStream[T](javaStream: SplitJavaStream[T]) { */ def selectAll(): DataStream[T] = new DataStream[T](javaStream.selectAll()) -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/1f7b6eaa/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala index 5dfbc3b..5f579e5 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala @@ -35,11 +35,13 @@ import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable import org.apache.flink.streaming.api.function.co.CrossWindowFunction import org.apache.flink.api.common.functions.CrossFunction -class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends TemporalOperator[I1, I2, StreamCrossOperator.CrossWindow[I1, I2]](i1, i2) { +class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends + TemporalOperator[I1, I2, StreamCrossOperator.CrossWindow[I1, I2]](i1, i2) { override def createNextWindowOperator(): StreamCrossOperator.CrossWindow[I1, I2] = { - val crossWindowFunction = StreamCrossOperator.getCrossWindowFunction(this, (l: I1, r: I2) => (l, r)) + val crossWindowFunction = StreamCrossOperator.getCrossWindowFunction(this, + (l: I1, r: I2) => (l, r)) val returnType = new CaseClassTypeInfo[(I1, I2)]( @@ -69,24 +71,31 @@ class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extend } object StreamCrossOperator { - private[flink] class CrossWindow[I1, I2](op: StreamCrossOperator[I1, I2], javaStream: JavaStream[(I1, I2)]) extends DataStream[(I1, I2)](javaStream) { + private[flink] class CrossWindow[I1, I2](op: StreamCrossOperator[I1, I2], + javaStream: JavaStream[(I1, I2)]) extends + DataStream[(I1, I2)](javaStream) { /** - * Sets a wrapper for the crossed elements. For each crossed pair, the result of the udf call will be emitted. + * Sets a wrapper for the crossed elements. For each crossed pair, the result of the udf + * call will be emitted. * */ def apply[R: TypeInformation: ClassTag](fun: (I1, I2) => R): DataStream[R] = { val invokable = new CoWindowInvokable[I1, I2, R]( - clean(getCrossWindowFunction(op, fun)), op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2) + clean(getCrossWindowFunction(op, fun)), op.windowSize, op.slideInterval, op.timeStamp1, + op.timeStamp2) - javaStream.getExecutionEnvironment().getJobGraphBuilder().setInvokable(javaStream.getId(), invokable) + javaStream.getExecutionEnvironment().getJobGraphBuilder().setInvokable(javaStream.getId(), + invokable) new DataStream[R](javaStream.setType(implicitly[TypeInformation[R]])) } } - private[flink] def getCrossWindowFunction[I1, I2, R](op: StreamCrossOperator[I1, I2], crossFunction: (I1, I2) => R): CrossWindowFunction[I1, I2, R] = { + private[flink] def getCrossWindowFunction[I1, I2, R](op: StreamCrossOperator[I1, I2], + crossFunction: (I1, I2) => R): + CrossWindowFunction[I1, I2, R] = { Validate.notNull(crossFunction, "Join function must not be null.") val crossFun = new CrossFunction[I1, I2, R] { @@ -100,4 +109,4 @@ object StreamCrossOperator { new CrossWindowFunction[I1, I2, R](crossFun) } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/1f7b6eaa/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala index 340ecc1..55f7c6c 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -117,7 +117,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * */ def generateSequence(from: Long, to: Long): DataStream[Long] = { - new DataStream[java.lang.Long](javaEnv.generateSequence(from, to)).asInstanceOf[DataStream[Long]] + new DataStream[java.lang.Long](javaEnv.generateSequence(from, to)). + asInstanceOf[DataStream[Long]] } /** @@ -147,7 +148,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { "elements", typeInfo); javaEnv.getJobGraphBuilder.addStreamVertex(returnStream.getId(), - new SourceInvokable[T](new FromElementsFunction[T](scala.collection.JavaConversions.asJavaCollection(data))), null, typeInfo, + new SourceInvokable[T](new FromElementsFunction[T](scala.collection.JavaConversions + .asJavaCollection(data))), null, typeInfo, "source", 1); new DataStream(returnStream) } @@ -204,7 +206,8 @@ object StreamExecutionEnvironment { * of parallelism of the local environment is the number of hardware contexts (CPU cores/threads). */ def createLocalEnvironment( - degreeOfParallelism: Int = Runtime.getRuntime.availableProcessors()): StreamExecutionEnvironment = { + degreeOfParallelism: Int = Runtime.getRuntime.availableProcessors()): + StreamExecutionEnvironment = { new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(degreeOfParallelism)) } @@ -223,7 +226,8 @@ object StreamExecutionEnvironment { * those must be * provided in the JAR files. */ - def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): StreamExecutionEnvironment = { + def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): + StreamExecutionEnvironment = { new StreamExecutionEnvironment(JavaEnv.createRemoteEnvironment(host, port, jarFiles: _*)) } @@ -251,4 +255,4 @@ object StreamExecutionEnvironment { javaEnv.setDegreeOfParallelism(degreeOfParallelism) new StreamExecutionEnvironment(javaEnv) } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/1f7b6eaa/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala index fff5e86..7a39da5 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala @@ -33,7 +33,8 @@ import scala.reflect.ClassTag import org.apache.commons.lang.Validate import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable -class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1, I2]](i1, i2) { +class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends +TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1, I2]](i1, i2) { override def createNextWindowOperator() = { new StreamJoinOperator.JoinWindow[I1, I2](this) @@ -61,7 +62,8 @@ object StreamJoinOperator { * to define the second key. */ def where(firstField: String, otherFields: String*) = { - new JoinPredicate[I1, I2](op, new PojoKeySelector[I1](op.input1.getType(), (firstField +: otherFields): _*)) + new JoinPredicate[I1, I2](op, new PojoKeySelector[I1](op.input1.getType(), + (firstField +: otherFields): _*)) } /** @@ -82,7 +84,8 @@ object StreamJoinOperator { } - class JoinPredicate[I1, I2](private[flink] val op: StreamJoinOperator[I1, I2], private[flink] val keys1: KeySelector[I1, _]) { + class JoinPredicate[I1, I2](private[flink] val op: StreamJoinOperator[I1, I2], + private[flink] val keys1: KeySelector[I1, _]) { private[flink] var keys2: KeySelector[I2, _] = null /** @@ -145,30 +148,36 @@ object StreamJoinOperator { } } - return op.input1.groupBy(keys1).connect(op.input2.groupBy(keys2)).addGeneralWindowCombine(getJoinWindowFunction(this, (_, _)), + return op.input1.groupBy(keys1).connect(op.input2.groupBy(keys2)) + .addGeneralWindowCombine(getJoinWindowFunction(this, (_, _)), returnType, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2) } } - class JoinedStream[I1, I2](jp: JoinPredicate[I1, I2], javaStream: JavaStream[(I1, I2)]) extends DataStream[(I1, I2)](javaStream) { + class JoinedStream[I1, I2](jp: JoinPredicate[I1, I2], javaStream: JavaStream[(I1, I2)]) extends + DataStream[(I1, I2)](javaStream) { private val op = jp.op /** - * Sets a wrapper for the joined elements. For each joined pair, the result of the udf call will be emitted. + * Sets a wrapper for the joined elements. For each joined pair, the result of the + * udf call will be emitted. */ def apply[R: TypeInformation: ClassTag](fun: (I1, I2) => R): DataStream[R] = { val invokable = new CoWindowInvokable[I1, I2, R]( - clean(getJoinWindowFunction(jp, fun)), op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2) + clean(getJoinWindowFunction(jp, fun)), op.windowSize, op.slideInterval, op.timeStamp1, + op.timeStamp2) - javaStream.getExecutionEnvironment().getJobGraphBuilder().setInvokable(javaStream.getId(), invokable) + javaStream.getExecutionEnvironment().getJobGraphBuilder().setInvokable(javaStream.getId(), + invokable) new DataStream[R](javaStream.setType(implicitly[TypeInformation[R]])) } } - private[flink] def getJoinWindowFunction[I1, I2, R](jp: JoinPredicate[I1, I2], joinFunction: (I1, I2) => R) = { + private[flink] def getJoinWindowFunction[I1, I2, R](jp: JoinPredicate[I1, I2], + joinFunction: (I1, I2) => R) = { Validate.notNull(joinFunction, "Join function must not be null.") val joinFun = new JoinFunction[I1, I2, R] { @@ -183,4 +192,4 @@ object StreamJoinOperator { new JoinWindowFunction[I1, I2, R](jp.keys1, jp.keys2, joinFun) } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/1f7b6eaa/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala index c037305..8c763fc 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -52,7 +52,8 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) { * This controls how often the user defined function will be triggered on * the window. */ - def every(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] = new WindowedDataStream[T](javaStream.every(windowingHelper: _*)) + def every(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] = + new WindowedDataStream[T](javaStream.every(windowingHelper: _*)) /** * Groups the elements of the WindowedDataStream using the given @@ -126,12 +127,14 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) { /** * Applies a reduceGroup transformation on the windowed data stream by reducing - * the current window at every trigger. In contrast with the simple binary reduce operator, groupReduce exposes the whole window through the Iterable interface. + * the current window at every trigger. In contrast with the simple binary reduce operator, + * groupReduce exposes the whole window through the Iterable interface. *
*
* Whenever possible try to use reduce instead of groupReduce for increased efficiency */ - def reduceGroup[R: ClassTag: TypeInformation](reducer: GroupReduceFunction[T, R]): DataStream[R] = { + def reduceGroup[R: ClassTag: TypeInformation](reducer: GroupReduceFunction[T, R]): + DataStream[R] = { if (reducer == null) { throw new NullPointerException("GroupReduce function must not be null.") } @@ -140,12 +143,14 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) { /** * Applies a reduceGroup transformation on the windowed data stream by reducing - * the current window at every trigger. In contrast with the simple binary reduce operator, groupReduce exposes the whole window through the Iterable interface. + * the current window at every trigger. In contrast with the simple binary reduce operator, + * groupReduce exposes the whole window through the Iterable interface. *
*
* Whenever possible try to use reduce instead of groupReduce for increased efficiency */ - def reduceGroup[R: ClassTag: TypeInformation](fun: (Iterable[T], Collector[R]) => Unit): DataStream[R] = { + def reduceGroup[R: ClassTag: TypeInformation](fun: (Iterable[T], Collector[R]) => Unit): + DataStream[R] = { if (fun == null) { throw new NullPointerException("GroupReduce function must not be null.") } @@ -181,16 +186,19 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) { * the given position. When equality, returns the first. * */ - def maxBy(position: Int, first: Boolean = true): DataStream[T] = aggregate(AggregationType.MAXBY, position, first) + def maxBy(position: Int, first: Boolean = true): DataStream[T] = aggregate(AggregationType.MAXBY, + position, first) /** * Applies an aggregation that that gives the minimum element of the window by * the given position. When equality, returns the first. * */ - def minBy(position: Int, first: Boolean = true): DataStream[T] = aggregate(AggregationType.MINBY, position, first) + def minBy(position: Int, first: Boolean = true): DataStream[T] = aggregate(AggregationType.MINBY, + position, first) - def aggregate(aggregationType: AggregationType, position: Int, first: Boolean = true): DataStream[T] = { + def aggregate(aggregationType: AggregationType, position: Int, first: Boolean = true): + DataStream[T] = { val jStream = javaStream.asInstanceOf[JavaWStream[Product]] val outType = jStream.getType().asInstanceOf[TupleTypeInfoBase[_]] @@ -198,11 +206,12 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) { val agg = new ScalaStreamingAggregator[Product](jStream.getType().createSerializer(), position) val reducer = aggregationType match { - case AggregationType.SUM => new agg.Sum(SumFunction.getForClass(outType.getTypeAt(position).getTypeClass())); + case AggregationType.SUM => new agg.Sum(SumFunction.getForClass( + outType.getTypeAt(position).getTypeClass())); case _ => new agg.ProductComparableAggregator(aggregationType, first) } new DataStream[Product](jStream.reduce(reducer)).asInstanceOf[DataStream[T]] } -} \ No newline at end of file +}