flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [24/36] flink git commit: [streaming] [scala] Restructured streaming scala project and examples
Date Wed, 07 Jan 2015 14:13:03 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/bf85c008/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
deleted file mode 100644
index bcd586e..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
+++ /dev/null
@@ -1,278 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.streaming
-
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.environment.{ StreamExecutionEnvironment => JavaEnv
}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.commons.lang.Validate
-import scala.reflect.ClassTag
-import org.apache.flink.streaming.api.datastream.DataStreamSource
-import org.apache.flink.streaming.api.invokable.SourceInvokable
-import org.apache.flink.streaming.api.function.source.FromElementsFunction
-import org.apache.flink.streaming.api.function.source.SourceFunction
-import scala.collection.JavaConversions._
-import org.apache.flink.util.Collector
-import org.apache.flink.api.scala.streaming.StreamingConversions._
-
-class StreamExecutionEnvironment(javaEnv: JavaEnv) {
-
-  /**
-   * Sets the degree of parallelism (DOP) for operations executed through this environment.
-   * Setting a DOP of x here will cause all operators (such as join, map, reduce) to run
with
-   * x parallel instances. This value can be overridden by specific operations using
-   * [[DataStream.setParallelism]].
-   */
-  def setDegreeOfParallelism(degreeOfParallelism: Int): Unit = {
-    javaEnv.setDegreeOfParallelism(degreeOfParallelism)
-  }
-
-  /**
-   * Returns the default degree of parallelism for this execution environment. Note that
this
-   * value can be overridden by individual operations using [[DataStream.setParallelism]]
-   */
-  def getDegreeOfParallelism = javaEnv.getDegreeOfParallelism
-
-  /**
-   * Sets the maximum time frequency (milliseconds) for the flushing of the
-   * output buffers. By default the output buffers flush frequently to provide
-   * low latency and to aid smooth developer experience. Setting the parameter
-   * can result in three logical modes:
-   *
-   * <ul>
-   * <li>
-   * A positive integer triggers flushing periodically by that integer</li>
-   * <li>
-   * 0 triggers flushing after every record thus minimizing latency</li>
-   * <li>
-   * -1 triggers flushing only when the output buffer is full thus maximizing
-   * throughput</li>
-   * </ul>
-   *
-   */
-  def setBufferTimeout(timeoutMillis: Long): StreamExecutionEnvironment = {
-    javaEnv.setBufferTimeout(timeoutMillis)
-    this
-  }
-
-  /**
-   * Gets the default buffer timeout set for this environment
-   */
-  def getBufferTimout: Long = javaEnv.getBufferTimeout()
-
-  /**
-   * Creates a DataStream that represents the Strings produced by reading the
-   * given file line wise. The file will be read with the system's default
-   * character set.
-   *
-   */
-  def readTextFile(filePath: String): DataStream[String] =
-    javaEnv.readTextFile(filePath)
-
-  /**
-   * Creates a DataStream that represents the Strings produced by reading the
-   * given file line wise multiple times(infinite). The file will be read with
-   * the system's default character set. This functionality can be used for
-   * testing a topology.
-   *
-   */
-  def readTextStream(StreamPath: String): DataStream[String] = 
-    javaEnv.readTextStream(StreamPath)
-
-  /**
-   * Creates a new DataStream that contains the strings received infinitely
-   * from socket. Received strings are decoded by the system's default
-   * character set.
-   *
-   */
-  def socketTextStream(hostname: String, port: Int, delimiter: Char): DataStream[String]
=
-    javaEnv.socketTextStream(hostname, port, delimiter)
-
-  /**
-   * Creates a new DataStream that contains the strings received infinitely
-   * from socket. Received strings are decoded by the system's default
-   * character set, uses '\n' as delimiter.
-   *
-   */
-  def socketTextStream(hostname: String, port: Int): DataStream[String] =
-    javaEnv.socketTextStream(hostname, port)
-
-  /**
-   * Creates a new DataStream that contains a sequence of numbers.
-   *
-   */
-  def generateSequence(from: Long, to: Long): DataStream[Long] = {
-    new DataStream[java.lang.Long](javaEnv.generateSequence(from, to)).
-      asInstanceOf[DataStream[Long]]
-  }
-
-  /**
-   * Creates a DataStream that contains the given elements. The elements must all be of the
-   * same type and must be serializable.
-   *
-   * * Note that this operation will result in a non-parallel data source, i.e. a data source
with
-   * a degree of parallelism of one.
-   */
-  def fromElements[T: ClassTag: TypeInformation](data: T*): DataStream[T] = {
-    val typeInfo = implicitly[TypeInformation[T]]
-    fromCollection(data)(implicitly[ClassTag[T]], typeInfo)
-  }
-
-  /**
-   * Creates a DataStream from the given non-empty [[Seq]]. The elements need to be serializable
-   * because the framework may move the elements into the cluster if needed.
-   *
-   * Note that this operation will result in a non-parallel data source, i.e. a data source
with
-   * a degree of parallelism of one.
-   */
-  def fromCollection[T: ClassTag: TypeInformation](
-    data: Seq[T]): DataStream[T] = {
-    Validate.notNull(data, "Data must not be null.")
-    val typeInfo = implicitly[TypeInformation[T]]
-
-    val sourceFunction = new FromElementsFunction[T](scala.collection.JavaConversions
-        .asJavaCollection(data))
-        
-    javaEnv.addSource(sourceFunction, typeInfo)
-  }
-
-  /**
-   * Create a DataStream using a user defined source function for arbitrary
-   * source functionality.
-   *
-   */
-  def addSource[T: ClassTag: TypeInformation](function: SourceFunction[T]): DataStream[T]
= {
-    Validate.notNull(function, "Function must not be null.")
-    val cleanFun = StreamExecutionEnvironment.clean(function)
-    val typeInfo = implicitly[TypeInformation[T]]
-    javaEnv.addSource(cleanFun, typeInfo)
-  }
-  
-   /**
-   * Create a DataStream using a user defined source function for arbitrary
-   * source functionality.
-   *
-   */
-  def addSource[T: ClassTag: TypeInformation](function: Collector[T] => Unit): DataStream[T]
= {
-    Validate.notNull(function, "Function must not be null.")
-    val sourceFunction = new SourceFunction[T] {
-      val cleanFun = StreamExecutionEnvironment.clean(function)
-      override def invoke(out: Collector[T]) {
-        cleanFun(out)
-      }
-    }
-    addSource(sourceFunction)
-  }
-
-  /**
-   * Triggers the program execution. The environment will execute all parts of
-   * the program that have resulted in a "sink" operation. Sink operations are
-   * for example printing results or forwarding them to a message queue.
-   * <p>
-   * The program execution will be logged and displayed with a generated
-   * default name.
-   *
-   */
-  def execute() = javaEnv.execute()
-
-  /**
-   * Triggers the program execution. The environment will execute all parts of
-   * the program that have resulted in a "sink" operation. Sink operations are
-   * for example printing results or forwarding them to a message queue.
-   * <p>
-   * The program execution will be logged and displayed with the provided name
-   *
-   */
-  def execute(jobName: String) = javaEnv.execute(jobName)
-
-}
-
-object StreamExecutionEnvironment {
-  
-  private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F =
{
-    ClosureCleaner.clean(f, checkSerializable)
-    f
-  }
-
-  /**
-   * Creates an execution environment that represents the context in which the program is
-   * currently executed. If the program is invoked standalone, this method returns a local
-   * execution environment. If the program is invoked from within the command line client
-   * to be submitted to a cluster, this method returns the execution environment of this
cluster.
-   */
-  def getExecutionEnvironment: StreamExecutionEnvironment = {
-    new StreamExecutionEnvironment(JavaEnv.getExecutionEnvironment)
-  }
-
-  /**
-   * Creates a local execution environment. The local execution environment will run the
program in
-   * a multi-threaded fashion in the same JVM as the environment was created in. The default
degree
-   * of parallelism of the local environment is the number of hardware contexts (CPU cores/threads).
-   */
-  def createLocalEnvironment(
-    degreeOfParallelism: Int =  Runtime.getRuntime.availableProcessors()):
-  StreamExecutionEnvironment = {
-    new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(degreeOfParallelism))
-  }
-
-  /**
-   * Creates a remote execution environment. The remote environment sends (parts of) the
program to
-   * a cluster for execution. Note that all file paths used in the program must be accessible
from
-   * the cluster. The execution will use the cluster's default degree of parallelism, unless
the
-   * parallelism is set explicitly via [[StreamExecutionEnvironment.setDegreeOfParallelism()]].
-   *
-   * @param host The host name or address of the master (JobManager),
-   *             where the program should be executed.
-   * @param port The port of the master (JobManager), where the program should be executed.
-   * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If
the
-   *                 program uses
-   *                 user-defined functions, user-defined input formats, or any libraries,
-   *                 those must be
-   *                 provided in the JAR files.
-   */
-  def createRemoteEnvironment(host: String, port: Int, jarFiles: String*):
-  StreamExecutionEnvironment = {
-    new StreamExecutionEnvironment(JavaEnv.createRemoteEnvironment(host, port, jarFiles:
_*))
-  }
-
-  /**
-   * Creates a remote execution environment. The remote environment sends (parts of) the
program
-   * to a cluster for execution. Note that all file paths used in the program must be accessible
-   * from the cluster. The execution will use the specified degree of parallelism.
-   *
-   * @param host The host name or address of the master (JobManager),
-   *             where the program should be executed.
-   * @param port The port of the master (JobManager), where the program should be executed.
-   * @param degreeOfParallelism The degree of parallelism to use during the execution.
-   * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If
the
-   *                 program uses
-   *                 user-defined functions, user-defined input formats, or any libraries,
-   *                 those must be
-   *                 provided in the JAR files.
-   */
-  def createRemoteEnvironment(
-    host: String,
-    port: Int,
-    degreeOfParallelism: Int,
-    jarFiles: String*): StreamExecutionEnvironment = {
-    val javaEnv = JavaEnv.createRemoteEnvironment(host, port, jarFiles: _*)
-    javaEnv.setDegreeOfParallelism(degreeOfParallelism)
-    new StreamExecutionEnvironment(javaEnv)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bf85c008/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
deleted file mode 100644
index 8d8a0b0..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.streaming
-
-import org.apache.flink.api.common.functions.JoinFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.api.scala.ClosureCleaner
-import org.apache.flink.api.scala.typeutils.CaseClassSerializer
-import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
-import org.apache.flink.streaming.api.datastream.TemporalOperator
-import org.apache.flink.streaming.api.function.co.JoinWindowFunction
-import scala.reflect.ClassTag
-import org.apache.commons.lang.Validate
-import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable
-import org.apache.flink.streaming.util.keys.KeySelectorUtil
-import org.apache.flink.api.java.operators.Keys
-import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean
-import org.apache.flink.api.scala.streaming.StreamingConversions._
-
-class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends
-TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1, I2]](i1, i2) {
-
-  override def createNextWindowOperator() = {
-    new StreamJoinOperator.JoinWindow[I1, I2](this)
-  }
-}
-
-object StreamJoinOperator {
-
-  class JoinWindow[I1, I2](private[flink] op: StreamJoinOperator[I1, I2]) {
-
-    private[flink] val type1 = op.input1.getType();
-    
-    /**
-     * Continues a temporal Join transformation by defining
-     * the fields in the first stream to be used as keys for the join.
-     * The resulting incomplete join can be completed by JoinPredicate.equalTo()
-     * to define the second key.
-     */
-    def where(fields: Int*) = {
-      new JoinPredicate[I1, I2](op, KeySelectorUtil.getSelectorForKeys(
-          new Keys.ExpressionKeys(fields.toArray,type1),type1))
-    }
-
-    /**
-     * Continues a temporal Join transformation by defining
-     * the fields in the first stream to be used as keys for the join.
-     * The resulting incomplete join can be completed by JoinPredicate.equalTo()
-     * to define the second key.
-     */
-    def where(firstField: String, otherFields: String*) = 
-      new JoinPredicate[I1, I2](op, KeySelectorUtil.getSelectorForKeys(
-          new Keys.ExpressionKeys(firstField +: otherFields.toArray,type1),type1))  
-
-    /**
-     * Continues a temporal Join transformation by defining
-     * the keyselector function that will be used to extract keys from the first stream
-     * for the join.
-     * The resulting incomplete join can be completed by JoinPredicate.equalTo()
-     * to define the second key.
-     */
-    def where[K: TypeInformation](fun: (I1) => K) = {
-      val keyType = implicitly[TypeInformation[K]]
-      val keyExtractor = new KeySelector[I1, K] {
-        val cleanFun = op.input1.clean(fun)
-        def getKey(in: I1) = cleanFun(in)
-      }
-      new JoinPredicate[I1, I2](op, keyExtractor)
-    }
-
-  }
-
-  class JoinPredicate[I1, I2](private[flink] val op: StreamJoinOperator[I1, I2],
-                              private[flink] val keys1: KeySelector[I1, _]) {
-    private[flink] var keys2: KeySelector[I2, _] = null
-    private[flink] val type2 = op.input2.getType();
-
-    /**
-     * Creates a temporal join transformation by defining the second join key.
-     * The returned transformation wrapes each joined element pair in a tuple2:
-     * (first, second)
-     * To define a custom wrapping, use JoinedStream.apply(...)
-     */
-    def equalTo(fields: Int*): JoinedStream[I1, I2] = {
-      finish(KeySelectorUtil.getSelectorForKeys(
-          new Keys.ExpressionKeys(fields.toArray,type2),type2))
-    }
-
-    /**
-     * Creates a temporal join transformation by defining the second join key.
-     * The returned transformation wrapes each joined element pair in a tuple2:
-     * (first, second)
-     * To define a custom wrapping, use JoinedStream.apply(...)
-     */
-    def equalTo(firstField: String, otherFields: String*): JoinedStream[I1, I2] = 
-     finish(KeySelectorUtil.getSelectorForKeys(
-          new Keys.ExpressionKeys(firstField +: otherFields.toArray,type2),type2))
-
-    /**
-     * Creates a temporal join transformation by defining the second join key.
-     * The returned transformation wrapes each joined element pair in a tuple2:
-     * (first, second)
-     * To define a custom wrapping, use JoinedStream.apply(...)
-     */
-    def equalTo[K: TypeInformation](fun: (I2) => K): JoinedStream[I1, I2] = {
-      val keyType = implicitly[TypeInformation[K]]
-      val keyExtractor = new KeySelector[I2, K] {
-        val cleanFun = op.input1.clean(fun)
-        def getKey(in: I2) = cleanFun(in)
-      }
-      finish(keyExtractor)
-    }
-
-    private def finish(keys2: KeySelector[I2, _]): JoinedStream[I1, I2] = {
-      this.keys2 = keys2
-      new JoinedStream[I1, I2](this, createJoinOperator())
-    }
-
-    private def createJoinOperator(): JavaStream[(I1, I2)] = {
-
-      val returnType = new CaseClassTypeInfo[(I1, I2)](
-
-        classOf[(I1, I2)], Seq(op.input1.getType, op.input2.getType), Array("_1", "_2"))
{
-
-        override def createSerializer: TypeSerializer[(I1, I2)] = {
-          val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity)
-          for (i <- 0 until getArity) {
-            fieldSerializers(i) = types(i).createSerializer
-          }
-
-          new CaseClassSerializer[(I1, I2)](classOf[(I1, I2)], fieldSerializers) {
-            override def createInstance(fields: Array[AnyRef]) = {
-              (fields(0).asInstanceOf[I1], fields(1).asInstanceOf[I2])
-            }
-          }
-        }
-      }
-
-      return op.input1.groupBy(keys1).connect(op.input2.groupBy(keys2))
-        .addGeneralWindowCombine(getJoinWindowFunction(this, (_, _)),
-        returnType, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2)
-    }
-  }
-
-  class JoinedStream[I1, I2](jp: JoinPredicate[I1, I2], javaStream: JavaStream[(I1, I2)])
extends
-  DataStream[(I1, I2)](javaStream) {
-
-    private val op = jp.op
-
-    /**
-     * Sets a wrapper for the joined elements. For each joined pair, the result of the
-     * udf call will be emitted.
-     */
-    def apply[R: TypeInformation: ClassTag](fun: (I1, I2) => R): DataStream[R] = {
-
-      val invokable = new CoWindowInvokable[I1, I2, R](
-        clean(getJoinWindowFunction(jp, fun)), op.windowSize, op.slideInterval, op.timeStamp1,
-        op.timeStamp2)
-
-      javaStream.getExecutionEnvironment().getJobGraphBuilder().setInvokable(javaStream.getId(),
-        invokable)
-
-      javaStream.setType(implicitly[TypeInformation[R]])
-    }
-  }
-
-  private[flink] def getJoinWindowFunction[I1, I2, R](jp: JoinPredicate[I1, I2],
-                                                      joinFunction: (I1, I2) => R) = {
-    Validate.notNull(joinFunction, "Join function must not be null.")
-
-    val joinFun = new JoinFunction[I1, I2, R] {
-
-      val cleanFun = jp.op.input1.clean(joinFunction)
-
-      override def join(first: I1, second: I2): R = {
-        cleanFun(first, second)
-      }
-    }
-
-    new JoinWindowFunction[I1, I2, R](jp.keys1, jp.keys2, joinFun)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bf85c008/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamingConversions.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamingConversions.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamingConversions.scala
deleted file mode 100644
index 9aefa04..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamingConversions.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.streaming
-
-import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
-import org.apache.flink.streaming.api.datastream.{ WindowedDataStream => JavaWStream }
-import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaStream
}
-import org.apache.flink.streaming.api.datastream.{ ConnectedDataStream => JavaConStream
}
-
-object StreamingConversions {
-
-  implicit def javaToScalaStream[R](javaStream: JavaStream[R]): DataStream[R] =
-    new DataStream[R](javaStream)
-
-  implicit def javaToScalaWindowedStream[R](javaWStream: JavaWStream[R]): WindowedDataStream[R]
=
-    new WindowedDataStream[R](javaWStream)
-
-  implicit def javaToScalaSplitStream[R](javaStream: SplitJavaStream[R]): SplitDataStream[R]
=
-    new SplitDataStream[R](javaStream)
-
-  implicit def javaToScalaConnectedStream[IN1, IN2](javaStream: JavaConStream[IN1, IN2]):

-  ConnectedDataStream[IN1, IN2] = new ConnectedDataStream[IN1, IN2](javaStream)
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bf85c008/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
deleted file mode 100644
index 2f9c792..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.streaming
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
-import org.apache.flink.streaming.api.datastream.{ WindowedDataStream => JavaWStream }
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import scala.reflect.ClassTag
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.streaming.api.invokable.operator.MapInvokable
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
-import org.apache.flink.util.Collector
-import org.apache.flink.api.common.functions.FlatMapFunction
-import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable
-import org.apache.flink.api.common.functions.ReduceFunction
-import org.apache.flink.api.common.functions.ReduceFunction
-import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.streaming.api.windowing.helper.WindowingHelper
-import org.apache.flink.api.common.functions.GroupReduceFunction
-import org.apache.flink.streaming.api.invokable.StreamInvokable
-import scala.collection.JavaConversions._
-import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType
-import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
-import org.apache.flink.streaming.api.function.aggregation.SumFunction
-import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean
-import org.apache.flink.api.scala.streaming.StreamingConversions._
-
-class WindowedDataStream[T](javaStream: JavaWStream[T]) {
-
-  /**
-   * Defines the slide size (trigger frequency) for the windowed data stream.
-   * This controls how often the user defined function will be triggered on
-   * the window.
-   */
-  def every(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] =
-    javaStream.every(windowingHelper: _*)
-
-  /**
-   * Groups the elements of the WindowedDataStream using the given
-   * field positions. The window sizes (evictions) and slide sizes
-   * (triggers) will be calculated on the whole stream (in a central fashion),
-   * but the user defined functions will be applied on a per group basis.
-   * </br></br> To get windows and triggers on a per group basis apply the
-   * DataStream.window(...) operator on an already grouped data stream.
-   *
-   */
-  def groupBy(fields: Int*): WindowedDataStream[T] = javaStream.groupBy(fields: _*)
-
-  /**
-   * Groups the elements of the WindowedDataStream using the given
-   * field expressions. The window sizes (evictions) and slide sizes
-   * (triggers) will be calculated on the whole stream (in a central fashion),
-   * but the user defined functions will be applied on a per group basis.
-   * </br></br> To get windows and triggers on a per group basis apply the
-   * DataStream.window(...) operator on an already grouped data stream.
-   *
-   */
-  def groupBy(firstField: String, otherFields: String*): WindowedDataStream[T] =
-   javaStream.groupBy(firstField +: otherFields.toArray: _*)   
-    
-  /**
-   * Groups the elements of the WindowedDataStream using the given
-   * KeySelector function. The window sizes (evictions) and slide sizes
-   * (triggers) will be calculated on the whole stream (in a central fashion),
-   * but the user defined functions will be applied on a per group basis.
-   * </br></br> To get windows and triggers on a per group basis apply the
-   * DataStream.window(...) operator on an already grouped data stream.
-   *
-   */
-  def groupBy[K: TypeInformation](fun: T => K): WindowedDataStream[T] = {
-
-    val keyExtractor = new KeySelector[T, K] {
-      val cleanFun = clean(fun)
-      def getKey(in: T) = cleanFun(in)
-    }
-    javaStream.groupBy(keyExtractor)
-  }
-
-  /**
-   * Applies a reduce transformation on the windowed data stream by reducing
-   * the current window at every trigger.
-   *
-   */
-  def reduce(reducer: ReduceFunction[T]): DataStream[T] = {
-    if (reducer == null) {
-      throw new NullPointerException("Reduce function must not be null.")
-    }
-    javaStream.reduce(reducer)
-  }
-
-  /**
-   * Applies a reduce transformation on the windowed data stream by reducing
-   * the current window at every trigger.
-   *
-   */
-  def reduce(fun: (T, T) => T): DataStream[T] = {
-    if (fun == null) {
-      throw new NullPointerException("Reduce function must not be null.")
-    }
-    val reducer = new ReduceFunction[T] {
-      val cleanFun = clean(fun)
-      def reduce(v1: T, v2: T) = { cleanFun(v1, v2) }
-    }
-    reduce(reducer)
-  }
-
-  /**
-   * Applies a reduceGroup transformation on the windowed data stream by reducing
-   * the current window at every trigger. In contrast with the simple binary reduce operator,
-   * groupReduce exposes the whole window through the Iterable interface.
-   * </br>
-   * </br>
-   * Whenever possible try to use reduce instead of groupReduce for increased efficiency
-   */
-  def reduceGroup[R: ClassTag: TypeInformation](reducer: GroupReduceFunction[T, R]):
-  DataStream[R] = {
-    if (reducer == null) {
-      throw new NullPointerException("GroupReduce function must not be null.")
-    }
-    javaStream.reduceGroup(reducer, implicitly[TypeInformation[R]])
-  }
-
-  /**
-   * Applies a reduceGroup transformation on the windowed data stream by reducing
-   * the current window at every trigger. In contrast with the simple binary reduce operator,
-   * groupReduce exposes the whole window through the Iterable interface.
-   * </br>
-   * </br>
-   * Whenever possible try to use reduce instead of groupReduce for increased efficiency
-   */
-  def reduceGroup[R: ClassTag: TypeInformation](fun: (Iterable[T], Collector[R]) => Unit):
-  DataStream[R] = {
-    if (fun == null) {
-      throw new NullPointerException("GroupReduce function must not be null.")
-    }
-    val reducer = new GroupReduceFunction[T, R] {
-      val cleanFun = clean(fun)
-      def reduce(in: java.lang.Iterable[T], out: Collector[R]) = { cleanFun(in, out) }
-    }
-    reduceGroup(reducer)
-  }
-
-  /**
-   * Applies an aggregation that that gives the maximum of the elements in the window at
-   * the given position.
-   *
-   */
-  def max(position: Int): DataStream[T] = aggregate(AggregationType.MAX, position)
-
-  /**
-   * Applies an aggregation that that gives the minimum of the elements in the window at
-   * the given position.
-   *
-   */
-  def min(position: Int): DataStream[T] = aggregate(AggregationType.MIN, position)
-
-  /**
-   * Applies an aggregation that sums the elements in the window at the given position.
-   *
-   */
-  def sum(position: Int): DataStream[T] = aggregate(AggregationType.SUM, position)
-
-  /**
-   * Applies an aggregation that that gives the maximum element of the window by
-   * the given position. When equality, returns the first.
-   *
-   */
-  def maxBy(position: Int, first: Boolean = true): DataStream[T] = aggregate(AggregationType.MAXBY,
-    position, first)
-
-  /**
-   * Applies an aggregation that that gives the minimum element of the window by
-   * the given position. When equality, returns the first.
-   *
-   */
-  def minBy(position: Int, first: Boolean = true): DataStream[T] = aggregate(AggregationType.MINBY,
-    position, first)
-
-  def aggregate(aggregationType: AggregationType, position: Int, first: Boolean = true):
-  DataStream[T] = {
-
-    val jStream = javaStream.asInstanceOf[JavaWStream[Product]]
-    val outType = jStream.getType().asInstanceOf[TupleTypeInfoBase[_]]
-
-    val agg = new ScalaStreamingAggregator[Product](jStream.getType().createSerializer(),
position)
-
-    val reducer = aggregationType match {
-      case AggregationType.SUM => new agg.Sum(SumFunction.getForClass(
-        outType.getTypeAt(position).getTypeClass()));
-      case _ => new agg.ProductComparableAggregator(aggregationType, first)
-    }
-
-    new DataStream[Product](jStream.reduce(reducer)).asInstanceOf[DataStream[T]]
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bf85c008/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/windowing/Delta.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/windowing/Delta.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/windowing/Delta.scala
deleted file mode 100644
index b7d1546..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/windowing/Delta.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.streaming.windowing
-
-import org.apache.flink.streaming.api.windowing.helper.{ Delta => JavaDelta }
-import org.apache.commons.lang.Validate
-import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean
-import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction
-
-object Delta {
-
-  /**
-   * Creates a delta helper representing a delta trigger or eviction policy.
-   * </br></br> This policy calculates a delta between the data point which
-   * triggered last and the currently arrived data point. It triggers if the
-   * delta is higher than a specified threshold. </br></br> In case it gets
-   * used for eviction, this policy starts from the first element of the
-   * buffer and removes all elements from the buffer which have a higher delta
-   * then the threshold. As soon as there is an element with a lower delta,
-   * the eviction stops.
-   */
-  def of[T](threshold: Double, deltaFunction: (T, T) => Double, initVal: T): JavaDelta[T]
= {
-    Validate.notNull(deltaFunction, "Delta function must not be null")
-    val df = new DeltaFunction[T] {
-      val cleanFun = clean(deltaFunction)
-      override def getDelta(first: T, second: T) = cleanFun(first, second)
-    }
-    JavaDelta.of(threshold, df, initVal)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bf85c008/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/windowing/Time.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/windowing/Time.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/windowing/Time.scala
deleted file mode 100644
index 62a47c2..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/windowing/Time.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.streaming.windowing
-
-import java.util.concurrent.TimeUnit
-import org.apache.flink.streaming.api.windowing.helper.{ Time => JavaTime }
-import org.apache.flink.api.scala.ClosureCleaner
-import org.apache.commons.net.ntp.TimeStamp
-import org.apache.flink.streaming.api.windowing.helper.Timestamp
-import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean
-import org.apache.commons.lang.Validate
-
-object Time {
-
-  /**
-   * Creates a helper representing a time trigger which triggers every given
-   * length (slide size) or a time eviction which evicts all elements older
-   * than length (window size) using System time.
-   *
-   */
-  def of(windowSize: Long, timeUnit: TimeUnit): JavaTime[_] =
-    JavaTime.of(windowSize, timeUnit)
-
-  /**
-   * Creates a helper representing a time trigger which triggers every given
-   * length (slide size) or a time eviction which evicts all elements older
-   * than length (window size) using a user defined timestamp extractor.
-   *
-   */
-  def of[R](windowSize: Long, timestamp: R => Long, startTime: Long = 0): JavaTime[R]
= {
-    Validate.notNull(timestamp, "Timestamp must not be null.")
-    val ts = new Timestamp[R] {
-      val fun = clean(timestamp, true)
-      override def getTimestamp(in: R) = fun(in)
-    }
-    JavaTime.of(windowSize, ts, startTime)
-  }
-
-}


Mime
View raw message