flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [16/36] flink git commit: [scala] [streaming] Added scala window helpers + timestamp rework for lambda support
Date Wed, 07 Jan 2015 14:12:55 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/1492e966/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/windowing/Delta.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/windowing/Delta.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/windowing/Delta.scala
new file mode 100644
index 0000000..b7d1546
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/windowing/Delta.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.streaming.windowing
+
+import org.apache.flink.streaming.api.windowing.helper.{ Delta => JavaDelta }
+import org.apache.commons.lang.Validate
+import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean
+import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction
+
+object Delta {
+
+  /**
+   * Creates a delta helper representing a delta trigger or eviction policy.
+   * </br></br> This policy calculates a delta between the data point which
+   * triggered last and the currently arrived data point. It triggers if the
+   * delta is higher than a specified threshold. </br></br> In case it gets
+   * used for eviction, this policy starts from the first element of the
+   * buffer and removes all elements from the buffer which have a higher delta
+   * then the threshold. As soon as there is an element with a lower delta,
+   * the eviction stops.
+   */
+  def of[T](threshold: Double, deltaFunction: (T, T) => Double, initVal: T): JavaDelta[T]
= {
+    Validate.notNull(deltaFunction, "Delta function must not be null")
+    val df = new DeltaFunction[T] {
+      val cleanFun = clean(deltaFunction)
+      override def getDelta(first: T, second: T) = cleanFun(first, second)
+    }
+    JavaDelta.of(threshold, df, initVal)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1492e966/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/windowing/Time.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/windowing/Time.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/windowing/Time.scala
new file mode 100644
index 0000000..62a47c2
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/windowing/Time.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.streaming.windowing
+
+import java.util.concurrent.TimeUnit
+import org.apache.flink.streaming.api.windowing.helper.{ Time => JavaTime }
+import org.apache.flink.api.scala.ClosureCleaner
+import org.apache.commons.net.ntp.TimeStamp
+import org.apache.flink.streaming.api.windowing.helper.Timestamp
+import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean
+import org.apache.commons.lang.Validate
+
+object Time {
+
+  /**
+   * Creates a helper representing a time trigger which triggers every given
+   * length (slide size) or a time eviction which evicts all elements older
+   * than length (window size) using System time.
+   *
+   */
+  def of(windowSize: Long, timeUnit: TimeUnit): JavaTime[_] =
+    JavaTime.of(windowSize, timeUnit)
+
+  /**
+   * Creates a helper representing a time trigger which triggers every given
+   * length (slide size) or a time eviction which evicts all elements older
+   * than length (window size) using a user defined timestamp extractor.
+   *
+   */
+  def of[R](windowSize: Long, timestamp: R => Long, startTime: Long = 0): JavaTime[R]
= {
+    Validate.notNull(timestamp, "Timestamp must not be null.")
+    val ts = new Timestamp[R] {
+      val fun = clean(timestamp, true)
+      override def getTimestamp(in: R) = fun(in)
+    }
+    JavaTime.of(windowSize, ts, startTime)
+  }
+
+}


Mime
View raw message