flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [4/4] flink git commit: [FLINK-2277] [scala api] Add flag to set delta iteration solution set to unmanaged
Date Tue, 11 Aug 2015 11:14:22 GMT
[FLINK-2277] [scala api] Add flag to set delta iteration solution set to unmanaged

This closes #1005


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f50ae26a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f50ae26a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f50ae26a

Branch: refs/heads/master
Commit: f50ae26a2fb4a0c7f5b390e2f0f5528be9f61730
Parents: b42fbf7
Author: Pieter-Jan Van Aeken <pieterjan.vanaeken@euranova.eu>
Authored: Mon Aug 10 15:16:08 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Tue Aug 11 13:13:47 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/api/scala/DataSet.scala    | 60 ++++++++++++++++++++
 1 file changed, 60 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f50ae26a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index 167aa26..207bc5d 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -1075,6 +1075,36 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
    *
    * Note: The syntax of delta iterations are very likely going to change soon.
    */
+  def iterateDelta[R: ClassTag](workset: DataSet[R], maxIterations: Int, keyFields: Array[Int],
+                                 solutionSetUnManaged: Boolean)(
+    stepFunction: (DataSet[T], DataSet[R]) => (DataSet[T], DataSet[R])) = {
+    val key = new ExpressionKeys[T](keyFields, javaSet.getType, false)
+
+    val iterativeSet = new DeltaIteration[T, R](
+      javaSet.getExecutionEnvironment,
+      javaSet.getType,
+      javaSet,
+      workset.javaSet,
+      key,
+      maxIterations)
+
+    iterativeSet.setSolutionSetUnManaged(solutionSetUnManaged)
+
+    val (newSolution, newWorkset) = stepFunction(
+      wrap(iterativeSet.getSolutionSet),
+      wrap(iterativeSet.getWorkset))
+    val result = iterativeSet.closeWith(newSolution.javaSet, newWorkset.javaSet)
+    wrap(result)
+  }
+
+  /**
+   * Creates a new DataSet by performing delta (or workset) iterations using the given step
+   * function. At the beginning `this` DataSet is the solution set and `workset` is the Workset.
+   * The iteration step function gets the current solution set and workset and must output
the
+   * delta for the solution set and the workset for the next iteration.
+   *
+   * Note: The syntax of delta iterations are very likely going to change soon.
+   */
   def iterateDelta[R: ClassTag](workset: DataSet[R], maxIterations: Int, keyFields: Array[String])(
     stepFunction: (DataSet[T], DataSet[R]) => (DataSet[T], DataSet[R])) = {
 
@@ -1094,6 +1124,36 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
     wrap(result)
   }
 
+  /**
+   * Creates a new DataSet by performing delta (or workset) iterations using the given step
+   * function. At the beginning `this` DataSet is the solution set and `workset` is the Workset.
+   * The iteration step function gets the current solution set and workset and must output
the
+   * delta for the solution set and the workset for the next iteration.
+   *
+   * Note: The syntax of delta iterations are very likely going to change soon.
+   */
+  def iterateDelta[R: ClassTag](workset: DataSet[R], maxIterations: Int, keyFields: Array[String],
+                                 solutionSetUnManaged: Boolean)(
+    stepFunction: (DataSet[T], DataSet[R]) => (DataSet[T], DataSet[R])) = {
+
+    val key = new ExpressionKeys[T](keyFields, javaSet.getType)
+    val iterativeSet = new DeltaIteration[T, R](
+      javaSet.getExecutionEnvironment,
+      javaSet.getType,
+      javaSet,
+      workset.javaSet,
+      key,
+      maxIterations)
+
+    iterativeSet.setSolutionSetUnManaged(solutionSetUnManaged)
+
+    val (newSolution, newWorkset) = stepFunction(
+      wrap(iterativeSet.getSolutionSet),
+      wrap(iterativeSet.getWorkset))
+    val result = iterativeSet.closeWith(newSolution.javaSet, newWorkset.javaSet)
+    wrap(result)
+  }
+
   // -------------------------------------------------------------------------------------------
   //  Custom Operators
   // -------------------------------------------------------------------------------------------


Mime
View raw message