flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Christoph Viebig (lists)" <li...@christoph-viebig.de>
Subject DeltaIterations: Solution set delta does not depend on workset
Date Mon, 09 Jun 2014 15:10:01 GMT
in an university project at TU Berlin we are implementing the Random Forest
algorithm on Stratosphere. We decided to use DeltaIterations to perform the
computation. The algorithm will grow a random tree (a sort of decision tree)
iteratively. In each iteration a new level of the tree is created and the data
is assigned to the corresponding child node in that level. The solution set is
the set of nodes in the tree. It is appended in each iteration. The workset is
the set of data. It is modified in each iteration as each data record also
stores the assigned node in the tree. Unfortunately we are experiencing a PACT
compiler exception. Can you please give us an advise how to resolve it? What is
the solution set delta and how should it depend on the workset? We append the
solution set in each iteration and use it to manipulate the workset.

Thank you very much in advance!

Best regards


Exception in thread "main" eu.stratosphere.compiler.CompilerException: In the
given plan, the solution set delta does not depend on the workset. This is a
prerequisite in workset iterations.
at eu.stratosphere.api.common.Plan.accept(Plan.java:298)
at eu.stratosphere.compiler.PactCompiler.compile(PactCompiler.java:670)
at eu.stratosphere.compiler.PactCompiler.compile(PactCompiler.java:553)
at eu.stratosphere.client.LocalExecutor.executePlan(LocalExecutor.java:216)
at eu.stratosphere.client.LocalExecutor.execute(LocalExecutor.java:301)
case class Node(id: Int, split: Split, maxLabel: String, leaf: Boolean = false)

case class Split(dim: Int, value: Double) {}

case class Point(features: Vector, label: String) {}

case class Data(nodeId: Int, point: Point) {}

/* Compute first node in tree.
def computeFirstNode(data: DataSet[Data]): DataSet[Node] = {
// Dummy.
val newNodes = Seq[Node]()

/* Create new child nodes if a node has more than class assigned to it
* Find a good split within the data that is assigned
def splitNodes(data: DataSet[Data]): DataSet[Node] = {
// Dummy.
val newNodes = Seq[Node]()

/* Assign data to the new nodes.
* If there is no child node k*2 or k*2+1 data is omitted otherwise it's assigned
to the child node depending on the split.
def reassignData(nodes: DataSet[Node], data: DataSet[Data]) : DataSet[Data] = {
// Dummy. It modifies Data.nodeId for each Data instance

/* Grow the tree and create a new level.
def growTree(s: DataSet[Node], ws: DataSet[Data]) = {
val newNodes = splitNodes(ws)
val newWorkSet = reassignData(newNodes, ws)
val newSolutionSet = s.union(newNodes)
(newSolutionSet, newWorkSet)

def getScalaPlan(numTrees: Int, maxDepth: Int, bootstrapRatio: Double,
inputFile: String, outputFile: String) = {

val points = DataSource(inputFile, CsvInputFormat[(Double, Double, Double,
String, String, String, String)]("\n", ','))
/* Create case objects from input data
* => (Point)
.map { case (x, y, z, _, _, _, label) => Point(new Vector(List[Double](x, y,
z)), label)}

val data = points
/* Sample data depending on bootstrapRatio and a random number
* (Point) => (Point)
.filter( point => Random.nextDouble < bootstrapRatio )
/* Initialize work set for delta iteration.
* First nodeId is 1 with depth 0.
* (Point) => (nodeId, Point)
.map { x => Data(1, x) }

/* Initialize tree
val initializedTree = computeFirstNode(data);

/* Perform delta iteration on data to grow a tree.
* Aborting when data is empty or maxDepth is reached.
* See
* See
val tree = initializedTree.iterateWithDelta(data, {_.id}, growTree, maxDepth)

/* Write tree (solution set) to disk
val output = tree.map{ case(node) => (node.id, node.split.dim, node.split.value,
node.maxLabel, node.leaf) }.write(outputFile, CsvOutputFormat())

new ScalaPlan(Seq(output), "RandomForest")
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message