flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: DeltaIterations: Solution set delta does not depend on workset
Date Tue, 10 Jun 2014 13:34:26 GMT
Hi,
this might be a stupid question but, in your actual implementation, do you
have code inside the dummy methods? If yes, could you post that as well?

Cheers,
Aljoscha


On Mon, Jun 9, 2014 at 5:10 PM, Christoph Viebig (lists) <
lists@christoph-viebig.de> wrote:

> Hi,
> in an university project at TU Berlin we are implementing the Random Forest
> algorithm on Stratosphere. We decided to use DeltaIterations to perform the
> computation. The algorithm will grow a random tree (a sort of decision
> tree)
> iteratively. In each iteration a new level of the tree is created and the
> data
> is assigned to the corresponding child node in that level. The solution
> set is
> the set of nodes in the tree. It is appended in each iteration. The
> workset is
> the set of data. It is modified in each iteration as each data record also
> stores the assigned node in the tree. Unfortunately we are experiencing a
> PACT
> compiler exception. Can you please give us an advise how to resolve it?
> What is
> the solution set delta and how should it depend on the workset? We append
> the
> solution set in each iteration and use it to manipulate the workset.
>
> Thank you very much in advance!
>
>
> Best regards
> Christoph
>
> ```
>
> Exception in thread "main" eu.stratosphere.compiler.CompilerException: In
> the
> given plan, the solution set delta does not depend on the workset. This is
> a
> prerequisite in workset iterations.
> at
>
> eu.stratosphere.compiler.PactCompiler$GraphCreatingVisitor.postVisit(PactCompiler.java:1025)
> at
>
> eu.stratosphere.compiler.PactCompiler$GraphCreatingVisitor.postVisit(PactCompiler.java:777)
> at
>
> eu.stratosphere.api.common.operators.DualInputOperator.accept(DualInputOperator.java:263)
> at
>
> eu.stratosphere.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:180)
> at
>
> eu.stratosphere.api.common.operators.GenericDataSink.accept(GenericDataSink.java:409)
> at eu.stratosphere.api.common.Plan.accept(Plan.java:298)
> at eu.stratosphere.compiler.PactCompiler.compile(PactCompiler.java:670)
> at eu.stratosphere.compiler.PactCompiler.compile(PactCompiler.java:553)
> at eu.stratosphere.client.LocalExecutor.executePlan(LocalExecutor.java:216)
> at eu.stratosphere.client.LocalExecutor.execute(LocalExecutor.java:301)
> at
>
> de.tu_berlin.impro3.stratosphere.classification.randomforest.RunRandomForest$.main(RandomForest.scala:220)
> at
>
> de.tu_berlin.impro3.stratosphere.classification.randomforest.RunRandomForest.main(RandomForest.scala)
> ```
> Code:
> ```
> case class Node(id: Int, split: Split, maxLabel: String, leaf: Boolean =
> false)
> {}
>
> case class Split(dim: Int, value: Double) {}
>
> case class Point(features: Vector, label: String) {}
>
> case class Data(nodeId: Int, point: Point) {}
>
> /* Compute first node in tree.
> */
> def computeFirstNode(data: DataSet[Data]): DataSet[Node] = {
> // Dummy.
> val newNodes = Seq[Node]()
> CollectionDataSource(newNodes)
> }
>
> /* Create new child nodes if a node has more than class assigned to it
> * Find a good split within the data that is assigned
> */
> def splitNodes(data: DataSet[Data]): DataSet[Node] = {
> // Dummy.
> val newNodes = Seq[Node]()
> CollectionDataSource(newNodes)
> }
>
> /* Assign data to the new nodes.
> * If there is no child node k*2 or k*2+1 data is omitted otherwise it's
> assigned
> to the child node depending on the split.
> */
> def reassignData(nodes: DataSet[Node], data: DataSet[Data]) :
> DataSet[Data] = {
> // Dummy. It modifies Data.nodeId for each Data instance
> (data)
> }
>
> /* Grow the tree and create a new level.
> */
> def growTree(s: DataSet[Node], ws: DataSet[Data]) = {
> val newNodes = splitNodes(ws)
> val newWorkSet = reassignData(newNodes, ws)
> val newSolutionSet = s.union(newNodes)
> (newSolutionSet, newWorkSet)
> }
>
> def getScalaPlan(numTrees: Int, maxDepth: Int, bootstrapRatio: Double,
> inputFile: String, outputFile: String) = {
>
> val points = DataSource(inputFile, CsvInputFormat[(Double, Double, Double,
> String, String, String, String)]("\n", ','))
> /* Create case objects from input data
> * => (Point)
> */
> .map { case (x, y, z, _, _, _, label) => Point(new Vector(List[Double](x,
> y,
> z)), label)}
>
> val data = points
> /* Sample data depending on bootstrapRatio and a random number
> * (Point) => (Point)
> */
> .filter( point => Random.nextDouble < bootstrapRatio )
> /* Initialize work set for delta iteration.
> * First nodeId is 1 with depth 0.
> * (Point) => (nodeId, Point)
> */
> .map { x => Data(1, x) }
>
> /* Initialize tree
> */
> val initializedTree = computeFirstNode(data);
>
> /* Perform delta iteration on data to grow a tree.
> * Aborting when data is empty or maxDepth is reached.
> * See
>
> https://github.com/stratosphere/stratosphere/blob/master/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/DataSet.scala
> * See
>
> https://github.com/stratosphere/stratosphere/blob/master/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/operators/IterateOperators.scala
> (WorkSetIterateMarcros)
> */
> val tree = initializedTree.iterateWithDelta(data, {_.id}, growTree,
> maxDepth)
>
> /* Write tree (solution set) to disk
> */
> val output = tree.map{ case(node) => (node.id, node.split.dim,
> node.split.value,
> node.maxLabel, node.leaf) }.write(outputFile, CsvOutputFormat())
>
> new ScalaPlan(Seq(output), "RandomForest")
> }
> ```

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message