Return-Path: X-Original-To: apmail-flink-issues-archive@minotaur.apache.org Delivered-To: apmail-flink-issues-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 71FCF18008 for ; Tue, 24 Nov 2015 03:13:11 +0000 (UTC) Received: (qmail 54647 invoked by uid 500); 24 Nov 2015 03:13:11 -0000 Delivered-To: apmail-flink-issues-archive@flink.apache.org Received: (qmail 54595 invoked by uid 500); 24 Nov 2015 03:13:11 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 54580 invoked by uid 99); 24 Nov 2015 03:13:11 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Nov 2015 03:13:11 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id 122912C14FB for ; Tue, 24 Nov 2015 03:13:11 +0000 (UTC) Date: Tue, 24 Nov 2015 03:13:11 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-1745) Add exact k-nearest-neighbours algorithm to machine learning library MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/FLINK-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15023684#comment-15023684 ] ASF GitHub Bot commented on FLINK-1745: --------------------------------------- Github user danielblazevski commented on a diff in the pull request: https://github.com/apache/flink/pull/1220#discussion_r45692984 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala --- @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.nn + +import org.apache.flink.api.common.operators.Order +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.utils._ +import org.apache.flink.api.scala._ +import org.apache.flink.ml.common._ +import org.apache.flink.ml.math.{Vector => FlinkVector, DenseVector} +import org.apache.flink.ml.metrics.distances.{SquaredEuclideanDistanceMetric, +DistanceMetric, EuclideanDistanceMetric} +import org.apache.flink.ml.pipeline.{FitOperation, PredictDataSetOperation, Predictor} +import org.apache.flink.util.Collector + +import org.apache.flink.ml.nn.util.QuadTree +import scala.collection.mutable.ListBuffer + +import scala.collection.immutable.Vector +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +/** Implements a k-nearest neighbor join. + * + * Calculates the `k` nearest neighbor points in the training set for each point in the test set. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = ... + * val testingDS: DataSet[Vector] = ... + * + * val knn = KNN() + * .setK(10) + * .setBlocks(5) + * .setDistanceMetric(EuclideanDistanceMetric()) + * + * knn.fit(trainingDS) + * + * val predictionDS: DataSet[(Vector, Array[Vector])] = knn.predict(testingDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.nn.KNN.K]] + * Sets the K which is the number of selected points as neighbors. (Default value: '''5''') + * + * - [[org.apache.flink.ml.nn.KNN.Blocks]] + * Sets the number of blocks into which the input data will be split. This number should be set + * at least to the degree of parallelism. If no value is specified, then the parallelism of the + * input [[DataSet]] is used as the number of blocks. (Default value: '''None''') + * + * - [[org.apache.flink.ml.nn.KNN.DistanceMetric]] + * Sets the distance metric we use to calculate the distance between two points. If no metric is + * specified, then [[org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric]] is used. + * (Default value: '''EuclideanDistanceMetric()''') + * + */ + +class KNN extends Predictor[KNN] { + + import KNN._ + + var trainingSet: Option[DataSet[Block[FlinkVector]]] = None + + /** Sets K + * @param k the number of selected points as neighbors + */ + def setK(k: Int): KNN = { + require(k > 0, "K must be positive.") + parameters.add(K, k) + this + } + + /** Sets the distance metric + * @param metric the distance metric to calculate distance between two points + */ + def setDistanceMetric(metric: DistanceMetric): KNN = { + parameters.add(DistanceMetric, metric) + this + } + + /** Sets the number of data blocks/partitions + * @param n the number of data blocks + */ + def setBlocks(n: Int): KNN = { + require(n > 0, "Number of blocks must be positive.") + parameters.add(Blocks, n) + this + } + + /** + * Sets the Boolean variable that decides whether to use the QuadTree or not + */ + def setUseQuadTree(UseQuadTree: Boolean): KNN = { + parameters.add(UseQuadTreeParam, UseQuadTree) + this + } + + +} + +object KNN { + + case object K extends Parameter[Int] { + val defaultValue: Option[Int] = Some(5) + } + + case object DistanceMetric extends Parameter[DistanceMetric] { + val defaultValue: Option[DistanceMetric] = Some(EuclideanDistanceMetric()) + } + + case object Blocks extends Parameter[Int] { + val defaultValue: Option[Int] = None + } + + case object UseQuadTreeParam extends Parameter[Boolean] { + val defaultValue: Option[Boolean] = None + } + + + def apply(): KNN = { + new KNN() + } + + /** [[FitOperation]] which trains a KNN based on the given training data set. + * @tparam T Subtype of [[org.apache.flink.ml.math.Vector]] + */ + + implicit def fitKNN[T <: FlinkVector : TypeInformation] = new FitOperation[KNN, T] { + override def fit( + instance: KNN, + fitParameters: ParameterMap, + input: DataSet[T]): Unit = { + val resultParameters = instance.parameters ++ fitParameters + + require(resultParameters.get(K).isDefined, "K is needed for calculation") + + val blocks = resultParameters.get(Blocks).getOrElse(input.getParallelism) + val partitioner = FlinkMLTools.ModuloKeyPartitioner + val inputAsVector = input.asInstanceOf[DataSet[FlinkVector]] + + instance.trainingSet = Some(FlinkMLTools.block(inputAsVector, blocks, Some(partitioner))) + } + } + + /** [[PredictDataSetOperation]] which calculates k-nearest neighbors of the given testing data + * set. + * @tparam T Subtype of [[Vector]] + * @return The given testing data set with k-nearest neighbors + */ + + implicit def predictValues[T <: FlinkVector : ClassTag : TypeInformation] = { + new PredictDataSetOperation[KNN, T, (FlinkVector, Array[FlinkVector])] { + override def predictDataSet( + instance: KNN, + predictParameters: ParameterMap, + input: DataSet[T]): + DataSet[(FlinkVector, Array[FlinkVector])] = { + val resultParameters = instance.parameters ++ predictParameters + + instance.trainingSet match { + case Some(trainingSet) => + val k = resultParameters.get(K).get + val blocks = resultParameters.get(Blocks).getOrElse(input.getParallelism) + val metric = resultParameters.get(DistanceMetric).get + val partitioner = FlinkMLTools.ModuloKeyPartitioner + + // attach unique id for each data + val inputWithId: DataSet[(Long, T)] = input.zipWithUniqueId + + // split data into multiple blocks + val inputSplit = FlinkMLTools.block(inputWithId, blocks, Some(partitioner)) + + // join input and training set + val crossed = trainingSet.cross(inputSplit).mapPartition { + (iter, out: Collector[(FlinkVector, FlinkVector, Long, Double)]) => { + for ((training, testing) <- iter) { + val queue = mutable.PriorityQueue[(FlinkVector, FlinkVector, Long, Double)]()( + Ordering.by(_._4)) + + // use a quadtree if (4^dim)Ntest*log(Ntrain) + // < Ntest*Ntrain, and distance is Euclidean + val useQuadTree = resultParameters.get(UseQuadTreeParam).getOrElse( + training.values.head.size + math.log(math.log(training.values.length) / + math.log(4.0)) < math.log(training.values.length) / math.log(4.0) && + (metric.isInstanceOf[EuclideanDistanceMetric] || + metric.isInstanceOf[SquaredEuclideanDistanceMetric])) + + if (useQuadTree) { + knnQueryWithQuadTree(training.values.asInstanceOf[Vector[DenseVector]], + testing.values,k, metric,queue, out) + } else { + knnQueryBasic(training.values.asInstanceOf[Vector[DenseVector]], + testing.values,k, metric,queue, out) + } + } + } + } + + // group by input vector id and pick k nearest neighbor for each group + val result = crossed.groupBy(2).sortGroup(3, Order.ASCENDING).reduceGroup { + (iter, out: Collector[(FlinkVector, Array[FlinkVector])]) => { + if (iter.hasNext) { + val head = iter.next() + val key = head._2 + val neighbors: ArrayBuffer[FlinkVector] = ArrayBuffer(head._1) + + for ((vector, _, _, _) <- iter.take(k - 1)) { + // we already took a first element + neighbors += vector + } + + out.collect(key, neighbors.toArray) + } + } + } + + result + case None => throw new RuntimeException("The KNN model has not been trained." + + "Call first fit before calling the predict operation.") + + } + } + } + } + + def knnQueryWithQuadTree[T <: FlinkVector](training: Vector[DenseVector], + testing: Vector[(Long, T)], + k: Int, metric: DistanceMetric, + queue: mutable.PriorityQueue[(FlinkVector, FlinkVector, Long, Double)], + out: Collector[(FlinkVector, FlinkVector, Long, Double)]) { + /// find a bounding box + val MinArr = List.range(0, training.head.size).toArray + val MaxArr = List.range(0, training.head.size).toArray + + val minVecTrain = MinArr.map(i => training.map(x => x(i)).min - 0.01) + val minVecTest = MinArr.map(i => testing.map(x => x._2(i)).min - 0.01) + val maxVecTrain = MaxArr.map(i => training.map(x => x(i)).min + 0.01) + val maxVecTest = MaxArr.map(i => testing.map(x => x._2(i)).min + 0.01) + + val MinVec = DenseVector(MinArr.map(i => Array(minVecTrain(i), minVecTest(i)).min)) + val MaxVec = DenseVector(MinArr.map(i => Array(maxVecTrain(i), maxVecTest(i)).max)) + + var trainingQuadTree = new QuadTree(MinVec, MaxVec, metric) + + if (trainingQuadTree.maxPerBox < k) { /// make sure at least k points/box + trainingQuadTree.maxPerBox = k + } + + for (v <- training) { + trainingQuadTree.insert(v.asInstanceOf[FlinkVector]) + } + + for (a <- testing) { + ///// Find siblings' objects and do local kNN there + val siblingObjects = + trainingQuadTree.searchNeighborsSiblingQueue( + a._2.asInstanceOf[FlinkVector]) --- End diff -- thanks @tillrohrmann, will update based on all your recent comments soon! > Add exact k-nearest-neighbours algorithm to machine learning library > -------------------------------------------------------------------- > > Key: FLINK-1745 > URL: https://issues.apache.org/jira/browse/FLINK-1745 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library > Reporter: Till Rohrmann > Assignee: Daniel Blazevski > Labels: ML, Starter > > Even though the k-nearest-neighbours (kNN) [1,2] algorithm is quite trivial it is still used as a mean to classify data and to do regression. This issue focuses on the implementation of an exact kNN (H-BNLJ, H-BRJ) algorithm as proposed in [2]. > Could be a starter task. > Resources: > [1] [http://en.wikipedia.org/wiki/K-nearest_neighbors_algorithm] > [2] [https://www.cs.utah.edu/~lifeifei/papers/mrknnj.pdf] -- This message was sent by Atlassian JIRA (v6.3.4#6332)