Return-Path: X-Original-To: apmail-mahout-commits-archive@www.apache.org Delivered-To: apmail-mahout-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DE15F17C43 for ; Wed, 1 Apr 2015 18:07:37 +0000 (UTC) Received: (qmail 93777 invoked by uid 500); 1 Apr 2015 18:07:34 -0000 Delivered-To: apmail-mahout-commits-archive@mahout.apache.org Received: (qmail 93634 invoked by uid 500); 1 Apr 2015 18:07:34 -0000 Mailing-List: contact commits-help@mahout.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@mahout.apache.org Delivered-To: mailing list commits@mahout.apache.org Received: (qmail 92158 invoked by uid 99); 1 Apr 2015 18:07:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 01 Apr 2015 18:07:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B7FA9E2F2D; Wed, 1 Apr 2015 18:07:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: pat@apache.org To: commits@mahout.apache.org Date: Wed, 01 Apr 2015 18:07:54 -0000 Message-Id: <6b19f5adf67c4d9b92d43d51dcb1669a@git.apache.org> In-Reply-To: <431aecaf7b9c4e7d96a698db8c9fc00c@git.apache.org> References: <431aecaf7b9c4e7d96a698db8c9fc00c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [23/51] [partial] mahout git commit: MAHOUT-1655 Refactors mr-legacy into mahout-hdfs and mahout-mr, closes apache/mahout#86 http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/distance/MahalanobisDistanceMeasure.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/common/distance/MahalanobisDistanceMeasure.java b/mr/src/main/java/org/apache/mahout/common/distance/MahalanobisDistanceMeasure.java new file mode 100644 index 0000000..a8fa091 --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/common/distance/MahalanobisDistanceMeasure.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.distance; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.io.Closeables; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.mahout.common.ClassUtils; +import org.apache.mahout.common.parameters.ClassParameter; +import org.apache.mahout.common.parameters.Parameter; +import org.apache.mahout.common.parameters.PathParameter; +import org.apache.mahout.math.Algebra; +import org.apache.mahout.math.CardinalityException; +import org.apache.mahout.math.DenseMatrix; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.Matrix; +import org.apache.mahout.math.MatrixWritable; +import org.apache.mahout.math.SingularValueDecomposition; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; + +import java.io.DataInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Collection; +import java.util.List; + +//See http://en.wikipedia.org/wiki/Mahalanobis_distance for details +public class MahalanobisDistanceMeasure implements DistanceMeasure { + + private Matrix inverseCovarianceMatrix; + private Vector meanVector; + + private ClassParameter vectorClass; + private ClassParameter matrixClass; + private List> parameters; + private Parameter inverseCovarianceFile; + private Parameter meanVectorFile; + + /*public MahalanobisDistanceMeasure(Vector meanVector,Matrix inputMatrix, boolean inversionNeeded) + { + this.meanVector=meanVector; + if (inversionNeeded) + setCovarianceMatrix(inputMatrix); + else + setInverseCovarianceMatrix(inputMatrix); + }*/ + + @Override + public void configure(Configuration jobConf) { + if (parameters == null) { + ParameteredGeneralizations.configureParameters(this, jobConf); + } + try { + if (inverseCovarianceFile.get() != null) { + FileSystem fs = FileSystem.get(inverseCovarianceFile.get().toUri(), jobConf); + MatrixWritable inverseCovarianceMatrix = + ClassUtils.instantiateAs((Class) matrixClass.get(), MatrixWritable.class); + if (!fs.exists(inverseCovarianceFile.get())) { + throw new FileNotFoundException(inverseCovarianceFile.get().toString()); + } + DataInputStream in = fs.open(inverseCovarianceFile.get()); + try { + inverseCovarianceMatrix.readFields(in); + } finally { + Closeables.close(in, true); + } + this.inverseCovarianceMatrix = inverseCovarianceMatrix.get(); + Preconditions.checkArgument(this.inverseCovarianceMatrix != null, "inverseCovarianceMatrix not initialized"); + } + + if (meanVectorFile.get() != null) { + FileSystem fs = FileSystem.get(meanVectorFile.get().toUri(), jobConf); + VectorWritable meanVector = + ClassUtils.instantiateAs((Class) vectorClass.get(), VectorWritable.class); + if (!fs.exists(meanVectorFile.get())) { + throw new FileNotFoundException(meanVectorFile.get().toString()); + } + DataInputStream in = fs.open(meanVectorFile.get()); + try { + meanVector.readFields(in); + } finally { + Closeables.close(in, true); + } + this.meanVector = meanVector.get(); + Preconditions.checkArgument(this.meanVector != null, "meanVector not initialized"); + } + + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + + @Override + public Collection> getParameters() { + return parameters; + } + + @Override + public void createParameters(String prefix, Configuration jobConf) { + parameters = Lists.newArrayList(); + inverseCovarianceFile = new PathParameter(prefix, "inverseCovarianceFile", jobConf, null, + "Path on DFS to a file containing the inverse covariance matrix."); + parameters.add(inverseCovarianceFile); + + matrixClass = new ClassParameter(prefix, "maxtrixClass", jobConf, DenseMatrix.class, + "Class file specified in parameter inverseCovarianceFile has been serialized with."); + parameters.add(matrixClass); + + meanVectorFile = new PathParameter(prefix, "meanVectorFile", jobConf, null, + "Path on DFS to a file containing the mean Vector."); + parameters.add(meanVectorFile); + + vectorClass = new ClassParameter(prefix, "vectorClass", jobConf, DenseVector.class, + "Class file specified in parameter meanVectorFile has been serialized with."); + parameters.add(vectorClass); + } + + /** + * @param v The vector to compute the distance to + * @return Mahalanobis distance of a multivariate vector + */ + public double distance(Vector v) { + return Math.sqrt(v.minus(meanVector).dot(Algebra.mult(inverseCovarianceMatrix, v.minus(meanVector)))); + } + + @Override + public double distance(Vector v1, Vector v2) { + if (v1.size() != v2.size()) { + throw new CardinalityException(v1.size(), v2.size()); + } + return Math.sqrt(v1.minus(v2).dot(Algebra.mult(inverseCovarianceMatrix, v1.minus(v2)))); + } + + @Override + public double distance(double centroidLengthSquare, Vector centroid, Vector v) { + return distance(centroid, v); // TODO + } + + public void setInverseCovarianceMatrix(Matrix inverseCovarianceMatrix) { + Preconditions.checkArgument(inverseCovarianceMatrix != null, "inverseCovarianceMatrix not initialized"); + this.inverseCovarianceMatrix = inverseCovarianceMatrix; + } + + + /** + * Computes the inverse covariance from the input covariance matrix given in input. + * + * @param m A covariance matrix. + * @throws IllegalArgumentException if eigen values equal to 0 found. + */ + public void setCovarianceMatrix(Matrix m) { + if (m.numRows() != m.numCols()) { + throw new CardinalityException(m.numRows(), m.numCols()); + } + // See http://www.mlahanas.de/Math/svd.htm for details, + // which specifically details the case of covariance matrix inversion + // Complexity: O(min(nm2,mn2)) + SingularValueDecomposition svd = new SingularValueDecomposition(m); + Matrix sInv = svd.getS(); + // Inverse Diagonal Elems + for (int i = 0; i < sInv.numRows(); i++) { + double diagElem = sInv.get(i, i); + if (diagElem > 0.0) { + sInv.set(i, i, 1 / diagElem); + } else { + throw new IllegalStateException("Eigen Value equals to 0 found."); + } + } + inverseCovarianceMatrix = svd.getU().times(sInv.times(svd.getU().transpose())); + Preconditions.checkArgument(inverseCovarianceMatrix != null, "inverseCovarianceMatrix not initialized"); + } + + public Matrix getInverseCovarianceMatrix() { + return inverseCovarianceMatrix; + } + + public void setMeanVector(Vector meanVector) { + Preconditions.checkArgument(meanVector != null, "meanVector not initialized"); + this.meanVector = meanVector; + } + + public Vector getMeanVector() { + return meanVector; + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/distance/ManhattanDistanceMeasure.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/common/distance/ManhattanDistanceMeasure.java b/mr/src/main/java/org/apache/mahout/common/distance/ManhattanDistanceMeasure.java new file mode 100644 index 0000000..5c32fcf --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/common/distance/ManhattanDistanceMeasure.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.distance; + +import java.util.Collection; +import java.util.Collections; + +import org.apache.hadoop.conf.Configuration; +import org.apache.mahout.common.parameters.Parameter; +import org.apache.mahout.math.CardinalityException; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.function.Functions; + +/** + * This class implements a "manhattan distance" metric by summing the absolute values of the difference + * between each coordinate + */ +public class ManhattanDistanceMeasure implements DistanceMeasure { + + public static double distance(double[] p1, double[] p2) { + double result = 0.0; + for (int i = 0; i < p1.length; i++) { + result += Math.abs(p2[i] - p1[i]); + } + return result; + } + + @Override + public void configure(Configuration job) { + // nothing to do + } + + @Override + public Collection> getParameters() { + return Collections.emptyList(); + } + + @Override + public void createParameters(String prefix, Configuration jobConf) { + // nothing to do + } + + @Override + public double distance(Vector v1, Vector v2) { + if (v1.size() != v2.size()) { + throw new CardinalityException(v1.size(), v2.size()); + } + return v1.aggregate(v2, Functions.PLUS, Functions.MINUS_ABS); + } + + @Override + public double distance(double centroidLengthSquare, Vector centroid, Vector v) { + return distance(centroid, v); // TODO + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/distance/MinkowskiDistanceMeasure.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/common/distance/MinkowskiDistanceMeasure.java b/mr/src/main/java/org/apache/mahout/common/distance/MinkowskiDistanceMeasure.java new file mode 100644 index 0000000..3a57f2f --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/common/distance/MinkowskiDistanceMeasure.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.distance; + +import java.util.Collection; +import java.util.List; + +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.mahout.common.parameters.DoubleParameter; +import org.apache.mahout.common.parameters.Parameter; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.function.Functions; + +/** + * Implement Minkowski distance, a real-valued generalization of the + * integral L(n) distances: Manhattan = L1, Euclidean = L2. + * For high numbers of dimensions, very high exponents give more useful distances. + * + * Note: Math.pow is clever about integer-valued doubles. + **/ +public class MinkowskiDistanceMeasure implements DistanceMeasure { + + private static final double EXPONENT = 3.0; + + private List> parameters; + private double exponent = EXPONENT; + + public MinkowskiDistanceMeasure() { + } + + public MinkowskiDistanceMeasure(double exponent) { + this.exponent = exponent; + } + + @Override + public void createParameters(String prefix, Configuration conf) { + parameters = Lists.newArrayList(); + Parameter param = + new DoubleParameter(prefix, "exponent", conf, EXPONENT, "Exponent for Fractional Lagrange distance"); + parameters.add(param); + } + + @Override + public Collection> getParameters() { + return parameters; + } + + @Override + public void configure(Configuration jobConf) { + if (parameters == null) { + ParameteredGeneralizations.configureParameters(this, jobConf); + } + } + + public double getExponent() { + return exponent; + } + + public void setExponent(double exponent) { + this.exponent = exponent; + } + + /** + * Math.pow is clever about integer-valued doubles + */ + @Override + public double distance(Vector v1, Vector v2) { + return Math.pow(v1.aggregate(v2, Functions.PLUS, Functions.minusAbsPow(exponent)), 1.0 / exponent); + } + + // TODO: how? + @Override + public double distance(double centroidLengthSquare, Vector centroid, Vector v) { + return distance(centroid, v); // TODO - can this use centroidLengthSquare somehow? + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/distance/SquaredEuclideanDistanceMeasure.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/common/distance/SquaredEuclideanDistanceMeasure.java b/mr/src/main/java/org/apache/mahout/common/distance/SquaredEuclideanDistanceMeasure.java new file mode 100644 index 0000000..66da121 --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/common/distance/SquaredEuclideanDistanceMeasure.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.distance; + +import java.util.Collection; +import java.util.Collections; + +import org.apache.hadoop.conf.Configuration; +import org.apache.mahout.common.parameters.Parameter; +import org.apache.mahout.math.Vector; + +/** + * Like {@link EuclideanDistanceMeasure} but it does not take the square root. + *

+ * Thus, it is not actually the Euclidean Distance, but it is saves on computation when you only need the + * distance for comparison and don't care about the actual value as a distance. + */ +public class SquaredEuclideanDistanceMeasure implements DistanceMeasure { + + @Override + public void configure(Configuration job) { + // nothing to do + } + + @Override + public Collection> getParameters() { + return Collections.emptyList(); + } + + @Override + public void createParameters(String prefix, Configuration jobConf) { + // nothing to do + } + + @Override + public double distance(Vector v1, Vector v2) { + return v2.getDistanceSquared(v1); + } + + @Override + public double distance(double centroidLengthSquare, Vector centroid, Vector v) { + return centroidLengthSquare - 2 * v.dot(centroid) + v.getLengthSquared(); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/distance/TanimotoDistanceMeasure.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/common/distance/TanimotoDistanceMeasure.java b/mr/src/main/java/org/apache/mahout/common/distance/TanimotoDistanceMeasure.java new file mode 100644 index 0000000..cfeb119 --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/common/distance/TanimotoDistanceMeasure.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.distance; + +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.function.Functions; + +/** + * Tanimoto coefficient implementation. + * + * http://en.wikipedia.org/wiki/Jaccard_index + */ +public class TanimotoDistanceMeasure extends WeightedDistanceMeasure { + + /** + * Calculates the distance between two vectors. + * + * The coefficient (a measure of similarity) is: T(a, b) = a.b / (|a|^2 + |b|^2 - a.b) + * + * The distance d(a,b) = 1 - T(a,b) + * + * @return 0 for perfect match, > 0 for greater distance + */ + @Override + public double distance(Vector a, Vector b) { + double ab; + double denominator; + if (getWeights() != null) { + ab = a.times(b).aggregate(getWeights(), Functions.PLUS, Functions.MULT); + denominator = a.aggregate(getWeights(), Functions.PLUS, Functions.MULT_SQUARE_LEFT) + + b.aggregate(getWeights(), Functions.PLUS, Functions.MULT_SQUARE_LEFT) + - ab; + } else { + ab = b.dot(a); // b is SequentialAccess + denominator = a.getLengthSquared() + b.getLengthSquared() - ab; + } + + if (denominator < ab) { // correct for fp round-off: distance >= 0 + denominator = ab; + } + if (denominator > 0) { + // denominator == 0 only when dot(a,a) == dot(b,b) == dot(a,b) == 0 + return 1.0 - ab / denominator; + } else { + return 0.0; + } + } + + @Override + public double distance(double centroidLengthSquare, Vector centroid, Vector v) { + return distance(centroid, v); // TODO + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/distance/WeightedDistanceMeasure.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/common/distance/WeightedDistanceMeasure.java b/mr/src/main/java/org/apache/mahout/common/distance/WeightedDistanceMeasure.java new file mode 100644 index 0000000..0c1d2cd --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/common/distance/WeightedDistanceMeasure.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.distance; + +import java.io.DataInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Collection; +import java.util.List; + +import com.google.common.collect.Lists; +import com.google.common.io.Closeables; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.mahout.common.ClassUtils; +import org.apache.mahout.common.parameters.ClassParameter; +import org.apache.mahout.common.parameters.Parameter; +import org.apache.mahout.common.parameters.PathParameter; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; + +/** Abstract implementation of DistanceMeasure with support for weights. */ +public abstract class WeightedDistanceMeasure implements DistanceMeasure { + + private List> parameters; + private Parameter weightsFile; + private ClassParameter vectorClass; + private Vector weights; + + @Override + public void createParameters(String prefix, Configuration jobConf) { + parameters = Lists.newArrayList(); + weightsFile = new PathParameter(prefix, "weightsFile", jobConf, null, + "Path on DFS to a file containing the weights."); + parameters.add(weightsFile); + vectorClass = new ClassParameter(prefix, "vectorClass", jobConf, DenseVector.class, + "Class file specified in parameter weightsFile has been serialized with."); + parameters.add(vectorClass); + } + + @Override + public Collection> getParameters() { + return parameters; + } + + @Override + public void configure(Configuration jobConf) { + if (parameters == null) { + ParameteredGeneralizations.configureParameters(this, jobConf); + } + try { + if (weightsFile.get() != null) { + FileSystem fs = FileSystem.get(weightsFile.get().toUri(), jobConf); + VectorWritable weights = + ClassUtils.instantiateAs((Class) vectorClass.get(), VectorWritable.class); + if (!fs.exists(weightsFile.get())) { + throw new FileNotFoundException(weightsFile.get().toString()); + } + DataInputStream in = fs.open(weightsFile.get()); + try { + weights.readFields(in); + } finally { + Closeables.close(in, true); + } + this.weights = weights.get(); + } + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + + public Vector getWeights() { + return weights; + } + + public void setWeights(Vector weights) { + this.weights = weights; + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/distance/WeightedEuclideanDistanceMeasure.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/common/distance/WeightedEuclideanDistanceMeasure.java b/mr/src/main/java/org/apache/mahout/common/distance/WeightedEuclideanDistanceMeasure.java new file mode 100644 index 0000000..c6889e2 --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/common/distance/WeightedEuclideanDistanceMeasure.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.distance; + + +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.Vector.Element; + +/** + * This class implements a Euclidean distance metric by summing the square root of the squared differences + * between each coordinate, optionally adding weights. + */ +public class WeightedEuclideanDistanceMeasure extends WeightedDistanceMeasure { + + @Override + public double distance(Vector p1, Vector p2) { + double result = 0; + Vector res = p2.minus(p1); + Vector theWeights = getWeights(); + if (theWeights == null) { + for (Element elt : res.nonZeroes()) { + result += elt.get() * elt.get(); + } + } else { + for (Element elt : res.nonZeroes()) { + result += elt.get() * elt.get() * theWeights.get(elt.index()); + } + } + return Math.sqrt(result); + } + + @Override + public double distance(double centroidLengthSquare, Vector centroid, Vector v) { + return distance(centroid, v); // TODO + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/distance/WeightedManhattanDistanceMeasure.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/common/distance/WeightedManhattanDistanceMeasure.java b/mr/src/main/java/org/apache/mahout/common/distance/WeightedManhattanDistanceMeasure.java new file mode 100644 index 0000000..2c280e2 --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/common/distance/WeightedManhattanDistanceMeasure.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.distance; + +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.Vector.Element; + +/** + * This class implements a "Manhattan distance" metric by summing the absolute values of the difference + * between each coordinate, optionally with weights. + */ +public class WeightedManhattanDistanceMeasure extends WeightedDistanceMeasure { + + @Override + public double distance(Vector p1, Vector p2) { + double result = 0; + + Vector res = p2.minus(p1); + if (getWeights() == null) { + for (Element elt : res.nonZeroes()) { + result += Math.abs(elt.get()); + } + + } else { + for (Element elt : res.nonZeroes()) { + result += Math.abs(elt.get() * getWeights().get(elt.index())); + } + } + + return result; + } + + @Override + public double distance(double centroidLengthSquare, Vector centroid, Vector v) { + return distance(centroid, v); // TODO + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/CopyConstructorIterator.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/CopyConstructorIterator.java b/mr/src/main/java/org/apache/mahout/common/iterator/CopyConstructorIterator.java new file mode 100644 index 0000000..73cc821 --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/common/iterator/CopyConstructorIterator.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.iterator; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.Iterator; + +import com.google.common.base.Function; +import com.google.common.collect.ForwardingIterator; +import com.google.common.collect.Iterators; + +/** + * An iterator that copies the values in an underlying iterator by finding an appropriate copy constructor. + */ +public final class CopyConstructorIterator extends ForwardingIterator { + + private final Iterator delegate; + private Constructor constructor; + + public CopyConstructorIterator(Iterator copyFrom) { + this.delegate = Iterators.transform( + copyFrom, + new Function() { + @Override + public T apply(T from) { + if (constructor == null) { + Class elementClass = (Class) from.getClass(); + try { + constructor = elementClass.getConstructor(elementClass); + } catch (NoSuchMethodException e) { + throw new IllegalStateException(e); + } + } + try { + return constructor.newInstance(from); + } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) { + throw new IllegalStateException(e); + } + } + }); + } + + @Override + protected Iterator delegate() { + return delegate; + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/CountingIterator.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/CountingIterator.java b/mr/src/main/java/org/apache/mahout/common/iterator/CountingIterator.java new file mode 100644 index 0000000..658c1f1 --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/common/iterator/CountingIterator.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.iterator; + +import com.google.common.collect.AbstractIterator; + +/** + * Iterates over the integers from 0 through {@code to-1}. + */ +public final class CountingIterator extends AbstractIterator { + + private int count; + private final int to; + + public CountingIterator(int to) { + this.to = to; + } + + @Override + protected Integer computeNext() { + if (count < to) { + return count++; + } else { + return endOfData(); + } + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/FileLineIterable.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/FileLineIterable.java b/mr/src/main/java/org/apache/mahout/common/iterator/FileLineIterable.java new file mode 100644 index 0000000..cfc18d6 --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/common/iterator/FileLineIterable.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.iterator; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.util.Iterator; + +import com.google.common.base.Charsets; + +/** + * Iterable representing the lines of a text file. It can produce an {@link Iterator} over those lines. This + * assumes the text file's lines are delimited in a manner consistent with how {@link java.io.BufferedReader} + * defines lines. + * + */ +public final class FileLineIterable implements Iterable { + + private final InputStream is; + private final Charset encoding; + private final boolean skipFirstLine; + private final String origFilename; + + /** Creates a {@link FileLineIterable} over a given file, assuming a UTF-8 encoding. */ + public FileLineIterable(File file) throws IOException { + this(file, Charsets.UTF_8, false); + } + + /** Creates a {@link FileLineIterable} over a given file, assuming a UTF-8 encoding. */ + public FileLineIterable(File file, boolean skipFirstLine) throws IOException { + this(file, Charsets.UTF_8, skipFirstLine); + } + + /** Creates a {@link FileLineIterable} over a given file, using the given encoding. */ + public FileLineIterable(File file, Charset encoding, boolean skipFirstLine) throws IOException { + this(FileLineIterator.getFileInputStream(file), encoding, skipFirstLine); + } + + public FileLineIterable(InputStream is) { + this(is, Charsets.UTF_8, false); + } + + public FileLineIterable(InputStream is, boolean skipFirstLine) { + this(is, Charsets.UTF_8, skipFirstLine); + } + + public FileLineIterable(InputStream is, Charset encoding, boolean skipFirstLine) { + this.is = is; + this.encoding = encoding; + this.skipFirstLine = skipFirstLine; + this.origFilename = ""; + } + + public FileLineIterable(InputStream is, Charset encoding, boolean skipFirstLine, String filename) { + this.is = is; + this.encoding = encoding; + this.skipFirstLine = skipFirstLine; + this.origFilename = filename; + } + + + @Override + public Iterator iterator() { + try { + return new FileLineIterator(is, encoding, skipFirstLine, this.origFilename); + } catch (IOException ioe) { + throw new IllegalStateException(ioe); + } + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/FileLineIterator.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/FileLineIterator.java b/mr/src/main/java/org/apache/mahout/common/iterator/FileLineIterator.java new file mode 100644 index 0000000..b7cc51e --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/common/iterator/FileLineIterator.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.iterator; + +import java.io.BufferedReader; +import java.io.Closeable; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.Charset; +import java.util.zip.GZIPInputStream; +import java.util.zip.ZipInputStream; + +import com.google.common.base.Charsets; +import com.google.common.collect.AbstractIterator; +import com.google.common.io.Closeables; +import com.google.common.io.Files; +import org.apache.mahout.cf.taste.impl.common.SkippingIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Iterates over the lines of a text file. This assumes the text file's lines are delimited in a manner + * consistent with how {@link BufferedReader} defines lines. + *

+ * This class will uncompress files that end in .zip or .gz accordingly, too. + */ +public final class FileLineIterator extends AbstractIterator implements SkippingIterator, Closeable { + + private final BufferedReader reader; + + private static final Logger log = LoggerFactory.getLogger(FileLineIterator.class); + + /** + * Creates a {@link FileLineIterator} over a given file, assuming a UTF-8 encoding. + * + * @throws java.io.FileNotFoundException if the file does not exist + * @throws IOException + * if the file cannot be read + */ + public FileLineIterator(File file) throws IOException { + this(file, Charsets.UTF_8, false); + } + + /** + * Creates a {@link FileLineIterator} over a given file, assuming a UTF-8 encoding. + * + * @throws java.io.FileNotFoundException if the file does not exist + * @throws IOException if the file cannot be read + */ + public FileLineIterator(File file, boolean skipFirstLine) throws IOException { + this(file, Charsets.UTF_8, skipFirstLine); + } + + /** + * Creates a {@link FileLineIterator} over a given file, using the given encoding. + * + * @throws java.io.FileNotFoundException if the file does not exist + * @throws IOException if the file cannot be read + */ + public FileLineIterator(File file, Charset encoding, boolean skipFirstLine) throws IOException { + this(getFileInputStream(file), encoding, skipFirstLine); + } + + public FileLineIterator(InputStream is) throws IOException { + this(is, Charsets.UTF_8, false); + } + + public FileLineIterator(InputStream is, boolean skipFirstLine) throws IOException { + this(is, Charsets.UTF_8, skipFirstLine); + } + + public FileLineIterator(InputStream is, Charset encoding, boolean skipFirstLine) throws IOException { + reader = new BufferedReader(new InputStreamReader(is, encoding)); + if (skipFirstLine) { + reader.readLine(); + } + } + + public FileLineIterator(InputStream is, Charset encoding, boolean skipFirstLine, String filename) + throws IOException { + InputStream compressedInputStream; + + if ("gz".equalsIgnoreCase(Files.getFileExtension(filename.toLowerCase()))) { + compressedInputStream = new GZIPInputStream(is); + } else if ("zip".equalsIgnoreCase(Files.getFileExtension(filename.toLowerCase()))) { + compressedInputStream = new ZipInputStream(is); + } else { + compressedInputStream = is; + } + + reader = new BufferedReader(new InputStreamReader(compressedInputStream, encoding)); + if (skipFirstLine) { + reader.readLine(); + } + } + + static InputStream getFileInputStream(File file) throws IOException { + InputStream is = new FileInputStream(file); + String name = file.getName(); + if ("gz".equalsIgnoreCase(Files.getFileExtension(name.toLowerCase()))) { + return new GZIPInputStream(is); + } else if ("zip".equalsIgnoreCase(Files.getFileExtension(name.toLowerCase()))) { + return new ZipInputStream(is); + } else { + return is; + } + } + + @Override + protected String computeNext() { + String line; + try { + line = reader.readLine(); + } catch (IOException ioe) { + try { + close(); + } catch (IOException e) { + log.error(e.getMessage(), e); + } + throw new IllegalStateException(ioe); + } + return line == null ? endOfData() : line; + } + + + @Override + public void skip(int n) { + try { + for (int i = 0; i < n; i++) { + if (reader.readLine() == null) { + break; + } + } + } catch (IOException ioe) { + try { + close(); + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + } + + @Override + public void close() throws IOException { + endOfData(); + Closeables.close(reader, true); + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/FixedSizeSamplingIterator.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/FixedSizeSamplingIterator.java b/mr/src/main/java/org/apache/mahout/common/iterator/FixedSizeSamplingIterator.java new file mode 100644 index 0000000..1905654 --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/common/iterator/FixedSizeSamplingIterator.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.iterator; + +import java.util.Iterator; +import java.util.List; +import java.util.Random; + +import com.google.common.collect.ForwardingIterator; +import com.google.common.collect.Lists; +import org.apache.mahout.common.RandomUtils; + +/** + * Sample a fixed number of elements from an Iterator. The results can appear in any order. + */ +public final class FixedSizeSamplingIterator extends ForwardingIterator { + + private final Iterator delegate; + + public FixedSizeSamplingIterator(int size, Iterator source) { + List buf = Lists.newArrayListWithCapacity(size); + int sofar = 0; + Random random = RandomUtils.getRandom(); + while (source.hasNext()) { + T v = source.next(); + sofar++; + if (buf.size() < size) { + buf.add(v); + } else { + int position = random.nextInt(sofar); + if (position < buf.size()) { + buf.set(position, v); + } + } + } + delegate = buf.iterator(); + } + + @Override + protected Iterator delegate() { + return delegate; + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/SamplingIterable.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/SamplingIterable.java b/mr/src/main/java/org/apache/mahout/common/iterator/SamplingIterable.java new file mode 100644 index 0000000..46ef411 --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/common/iterator/SamplingIterable.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.iterator; + +import java.util.Iterator; + +/** + * Wraps an {@link Iterable} whose {@link Iterable#iterator()} returns only some subset of the elements that + * it would, as determined by a iterator rate parameter. + */ +public final class SamplingIterable implements Iterable { + + private final Iterable delegate; + private final double samplingRate; + + public SamplingIterable(Iterable delegate, double samplingRate) { + this.delegate = delegate; + this.samplingRate = samplingRate; + } + + @Override + public Iterator iterator() { + return new SamplingIterator(delegate.iterator(), samplingRate); + } + + public static Iterable maybeWrapIterable(Iterable delegate, double samplingRate) { + return samplingRate >= 1.0 ? delegate : new SamplingIterable(delegate, samplingRate); + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/SamplingIterator.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/SamplingIterator.java b/mr/src/main/java/org/apache/mahout/common/iterator/SamplingIterator.java new file mode 100644 index 0000000..2ba46fd --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/common/iterator/SamplingIterator.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.iterator; + +import java.util.Iterator; + +import com.google.common.base.Preconditions; +import com.google.common.collect.AbstractIterator; +import org.apache.commons.math3.distribution.PascalDistribution; +import org.apache.mahout.cf.taste.impl.common.SkippingIterator; +import org.apache.mahout.common.RandomUtils; +import org.apache.mahout.common.RandomWrapper; + +/** + * Wraps an {@link Iterator} and returns only some subset of the elements that it would, as determined by a + * iterator rate parameter. + */ +public final class SamplingIterator extends AbstractIterator { + + private final PascalDistribution geometricDistribution; + private final Iterator delegate; + + public SamplingIterator(Iterator delegate, double samplingRate) { + this(RandomUtils.getRandom(), delegate, samplingRate); + } + + public SamplingIterator(RandomWrapper random, Iterator delegate, double samplingRate) { + Preconditions.checkNotNull(delegate); + Preconditions.checkArgument(samplingRate > 0.0 && samplingRate <= 1.0, + "Must be: 0.0 < samplingRate <= 1.0. But samplingRate = " + samplingRate); + // Geometric distribution is special case of negative binomial (aka Pascal) with r=1: + geometricDistribution = new PascalDistribution(random.getRandomGenerator(), 1, samplingRate); + this.delegate = delegate; + } + + @Override + protected T computeNext() { + int toSkip = geometricDistribution.sample(); + if (delegate instanceof SkippingIterator) { + SkippingIterator skippingDelegate = (SkippingIterator) delegate; + skippingDelegate.skip(toSkip); + if (skippingDelegate.hasNext()) { + return skippingDelegate.next(); + } + } else { + for (int i = 0; i < toSkip && delegate.hasNext(); i++) { + delegate.next(); + } + if (delegate.hasNext()) { + return delegate.next(); + } + } + return endOfData(); + } + + + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/StableFixedSizeSamplingIterator.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/StableFixedSizeSamplingIterator.java b/mr/src/main/java/org/apache/mahout/common/iterator/StableFixedSizeSamplingIterator.java new file mode 100644 index 0000000..c4ddf7b --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/common/iterator/StableFixedSizeSamplingIterator.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.iterator; + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Random; + +import com.google.common.base.Function; +import com.google.common.collect.ForwardingIterator; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import org.apache.mahout.common.Pair; +import org.apache.mahout.common.RandomUtils; + +/** + * Sample a fixed number of elements from an Iterator. The results will appear in the original order at some + * cost in time and memory relative to a FixedSizeSampler. + */ +public class StableFixedSizeSamplingIterator extends ForwardingIterator { + + private final Iterator delegate; + + public StableFixedSizeSamplingIterator(int size, Iterator source) { + List> buf = Lists.newArrayListWithCapacity(size); + int sofar = 0; + Random random = RandomUtils.getRandom(); + while (source.hasNext()) { + T v = source.next(); + sofar++; + if (buf.size() < size) { + buf.add(new Pair<>(sofar, v)); + } else { + int position = random.nextInt(sofar); + if (position < buf.size()) { + buf.set(position, new Pair<>(sofar, v)); + } + } + } + + Collections.sort(buf); + delegate = Iterators.transform(buf.iterator(), + new Function,T>() { + @Override + public T apply(Pair from) { + return from.getSecond(); + } + }); + } + + @Override + protected Iterator delegate() { + return delegate; + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/StringRecordIterator.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/StringRecordIterator.java b/mr/src/main/java/org/apache/mahout/common/iterator/StringRecordIterator.java new file mode 100644 index 0000000..73b841e --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/common/iterator/StringRecordIterator.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.iterator; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.regex.Pattern; + +import com.google.common.base.Function; +import com.google.common.collect.ForwardingIterator; +import com.google.common.collect.Iterators; +import org.apache.mahout.common.Pair; + +public class StringRecordIterator extends ForwardingIterator,Long>> { + + private static final Long ONE = 1L; + + private final Pattern splitter; + private final Iterator,Long>> delegate; + + public StringRecordIterator(Iterable stringIterator, String pattern) { + this.splitter = Pattern.compile(pattern); + delegate = Iterators.transform( + stringIterator.iterator(), + new Function,Long>>() { + @Override + public Pair,Long> apply(String from) { + String[] items = splitter.split(from); + return new Pair<>(Arrays.asList(items), ONE); + } + }); + } + + @Override + protected Iterator,Long>> delegate() { + return delegate; + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathFilters.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathFilters.java b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathFilters.java new file mode 100644 index 0000000..19f78b5 --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathFilters.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.iterator.sequencefile; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; + +/** + * Supplies some useful and repeatedly-used instances of {@link PathFilter}. + */ +public final class PathFilters { + + private static final PathFilter PART_FILE_INSTANCE = new PathFilter() { + @Override + public boolean accept(Path path) { + String name = path.getName(); + return name.startsWith("part-") && !name.endsWith(".crc"); + } + }; + + /** + * Pathfilter to read the final clustering file. + */ + private static final PathFilter CLUSTER_FINAL = new PathFilter() { + @Override + public boolean accept(Path path) { + String name = path.getName(); + return name.startsWith("clusters-") && name.endsWith("-final"); + } + }; + + private static final PathFilter LOGS_CRC_INSTANCE = new PathFilter() { + @Override + public boolean accept(Path path) { + String name = path.getName(); + return !(name.endsWith(".crc") || name.startsWith(".") || name.startsWith("_")); + } + }; + + private PathFilters() { + } + + /** + * @return {@link PathFilter} that accepts paths whose file name starts with "part-". Excludes + * ".crc" files. + */ + public static PathFilter partFilter() { + return PART_FILE_INSTANCE; + } + + /** + * @return {@link PathFilter} that accepts paths whose file name starts with "part-" and ends with "-final". + */ + public static PathFilter finalPartFilter() { + return CLUSTER_FINAL; + } + + /** + * @return {@link PathFilter} that rejects paths whose file name starts with "_" (e.g. Cloudera + * _SUCCESS files or Hadoop _logs), or "." (e.g. local hidden files), or ends with ".crc" + */ + public static PathFilter logsCRCFilter() { + return LOGS_CRC_INSTANCE; + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathType.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathType.java b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathType.java new file mode 100644 index 0000000..7ea713e --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathType.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.iterator.sequencefile; + +/** + * Used by {@link SequenceFileDirIterable} and the like to select whether the input path specifies a + * directory to list, or a glob pattern. + */ +public enum PathType { + GLOB, + LIST, +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterable.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterable.java b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterable.java new file mode 100644 index 0000000..ca4d6b8 --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterable.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.iterator.sequencefile; + +import java.io.IOException; +import java.util.Comparator; +import java.util.Iterator; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.io.Writable; +import org.apache.mahout.common.Pair; + +/** + *

{@link Iterable} counterpart to {@link SequenceFileDirIterator}.

+ */ +public final class SequenceFileDirIterable implements Iterable> { + + private final Path path; + private final PathType pathType; + private final PathFilter filter; + private final Comparator ordering; + private final boolean reuseKeyValueInstances; + private final Configuration conf; + + public SequenceFileDirIterable(Path path, PathType pathType, Configuration conf) { + this(path, pathType, null, conf); + } + + public SequenceFileDirIterable(Path path, PathType pathType, PathFilter filter, Configuration conf) { + this(path, pathType, filter, null, false, conf); + } + + /** + * @param path file to iterate over + * @param pathType whether or not to treat path as a directory ({@link PathType#LIST}) or + * glob pattern ({@link PathType#GLOB}) + * @param filter if not null, specifies sequence files to be ignored by the iteration + * @param ordering if not null, specifies the order in which to iterate over matching sequence files + * @param reuseKeyValueInstances if true, reuses instances of the value object instead of creating a new + * one for each read from the file + */ + public SequenceFileDirIterable(Path path, + PathType pathType, + PathFilter filter, + Comparator ordering, + boolean reuseKeyValueInstances, + Configuration conf) { + this.path = path; + this.pathType = pathType; + this.filter = filter; + this.ordering = ordering; + this.reuseKeyValueInstances = reuseKeyValueInstances; + this.conf = conf; + } + + @Override + public Iterator> iterator() { + try { + return new SequenceFileDirIterator<>(path, pathType, filter, ordering, reuseKeyValueInstances, conf); + } catch (IOException ioe) { + throw new IllegalStateException(path.toString(), ioe); + } + } + +} + http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java new file mode 100644 index 0000000..cf6a871 --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.iterator.sequencefile; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; + +import com.google.common.base.Function; +import com.google.common.collect.ForwardingIterator; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.io.Writable; +import org.apache.mahout.common.HadoopUtil; +import org.apache.mahout.common.IOUtils; +import org.apache.mahout.common.Pair; + +/** + * Like {@link SequenceFileIterator}, but iterates not just over one sequence file, but many. The input path + * may be specified as a directory of files to read, or as a glob pattern. The set of files may be optionally + * restricted with a {@link PathFilter}. + */ +public final class SequenceFileDirIterator + extends ForwardingIterator> implements Closeable { + + private static final FileStatus[] NO_STATUSES = new FileStatus[0]; + + private Iterator> delegate; + private final List> iterators; + + /** + * Multifile sequence file iterator where files are specified explicitly by + * path parameters. + */ + public SequenceFileDirIterator(Path[] path, + boolean reuseKeyValueInstances, + Configuration conf) throws IOException { + + iterators = Lists.newArrayList(); + // we assume all files should exist, otherwise we will bail out. + FileSystem fs = FileSystem.get(path[0].toUri(), conf); + FileStatus[] statuses = new FileStatus[path.length]; + for (int i = 0; i < statuses.length; i++) { + statuses[i] = fs.getFileStatus(path[i]); + } + init(statuses, reuseKeyValueInstances, conf); + } + + /** + * Constructor that uses either {@link FileSystem#listStatus(Path)} or + * {@link FileSystem#globStatus(Path)} to obtain list of files to iterate over + * (depending on pathType parameter). + */ + public SequenceFileDirIterator(Path path, + PathType pathType, + PathFilter filter, + Comparator ordering, + boolean reuseKeyValueInstances, + Configuration conf) throws IOException { + + FileStatus[] statuses = HadoopUtil.getFileStatus(path, pathType, filter, ordering, conf); + iterators = Lists.newArrayList(); + init(statuses, reuseKeyValueInstances, conf); + } + + private void init(FileStatus[] statuses, + final boolean reuseKeyValueInstances, + final Configuration conf) { + + /* + * prevent NPEs. Unfortunately, Hadoop would return null for list if nothing + * was qualified. In this case, which is a corner case, we should assume an + * empty iterator, not an NPE. + */ + if (statuses == null) { + statuses = NO_STATUSES; + } + + Iterator fileStatusIterator = Iterators.forArray(statuses); + + Iterator>> fsIterators = + Iterators.transform(fileStatusIterator, + new Function>>() { + @Override + public Iterator> apply(FileStatus from) { + try { + SequenceFileIterator iterator = new SequenceFileIterator<>(from.getPath(), + reuseKeyValueInstances, conf); + iterators.add(iterator); + return iterator; + } catch (IOException ioe) { + throw new IllegalStateException(from.getPath().toString(), ioe); + } + } + }); + + Collections.reverse(iterators); // close later in reverse order + + delegate = Iterators.concat(fsIterators); + } + + @Override + protected Iterator> delegate() { + return delegate; + } + + @Override + public void close() throws IOException { + IOUtils.close(iterators); + iterators.clear(); + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterable.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterable.java b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterable.java new file mode 100644 index 0000000..1cb4ebc --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterable.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.iterator.sequencefile; + +import java.io.IOException; +import java.util.Comparator; +import java.util.Iterator; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.io.Writable; + +/** + *

{@link Iterable} counterpart to {@link SequenceFileDirValueIterator}.

+ */ +public final class SequenceFileDirValueIterable implements Iterable { + + private final Path path; + private final PathType pathType; + private final PathFilter filter; + private final Comparator ordering; + private final boolean reuseKeyValueInstances; + private final Configuration conf; + + public SequenceFileDirValueIterable(Path path, PathType pathType, Configuration conf) { + this(path, pathType, null, conf); + } + + public SequenceFileDirValueIterable(Path path, PathType pathType, PathFilter filter, Configuration conf) { + this(path, pathType, filter, null, false, conf); + } + + /** + * @param path file to iterate over + * @param pathType whether or not to treat path as a directory ({@link PathType#LIST}) or + * glob pattern ({@link PathType#GLOB}) + * @param filter if not null, specifies sequence files to be ignored by the iteration + * @param ordering if not null, specifies the order in which to iterate over matching sequence files + * @param reuseKeyValueInstances if true, reuses instances of the value object instead of creating a new + * one for each read from the file + */ + public SequenceFileDirValueIterable(Path path, + PathType pathType, + PathFilter filter, + Comparator ordering, + boolean reuseKeyValueInstances, + Configuration conf) { + this.path = path; + this.pathType = pathType; + this.filter = filter; + this.ordering = ordering; + this.reuseKeyValueInstances = reuseKeyValueInstances; + this.conf = conf; + } + + @Override + public Iterator iterator() { + try { + return new SequenceFileDirValueIterator<>(path, pathType, filter, ordering, reuseKeyValueInstances, conf); + } catch (IOException ioe) { + throw new IllegalStateException(path.toString(), ioe); + } + } + +} + http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java new file mode 100644 index 0000000..908c8bb --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.iterator.sequencefile; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; + +import com.google.common.base.Function; +import com.google.common.collect.ForwardingIterator; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.io.Writable; +import org.apache.mahout.common.IOUtils; + +/** + * Like {@link SequenceFileValueIterator}, but iterates not just over one + * sequence file, but many. The input path may be specified as a directory of + * files to read, or as a glob pattern. The set of files may be optionally + * restricted with a {@link PathFilter}. + */ +public final class SequenceFileDirValueIterator extends + ForwardingIterator implements Closeable { + + private static final FileStatus[] NO_STATUSES = new FileStatus[0]; + + private Iterator delegate; + private final List> iterators; + + /** + * Constructor that uses either {@link FileSystem#listStatus(Path)} or + * {@link FileSystem#globStatus(Path)} to obtain list of files to iterate over + * (depending on pathType parameter). + */ + public SequenceFileDirValueIterator(Path path, + PathType pathType, + PathFilter filter, + Comparator ordering, + boolean reuseKeyValueInstances, + Configuration conf) throws IOException { + FileStatus[] statuses; + FileSystem fs = FileSystem.get(path.toUri(), conf); + if (filter == null) { + statuses = pathType == PathType.GLOB ? fs.globStatus(path) : fs.listStatus(path); + } else { + statuses = pathType == PathType.GLOB ? fs.globStatus(path, filter) : fs.listStatus(path, filter); + } + iterators = Lists.newArrayList(); + init(statuses, ordering, reuseKeyValueInstances, conf); + } + + /** + * Multifile sequence file iterator where files are specified explicitly by + * path parameters. + */ + public SequenceFileDirValueIterator(Path[] path, + Comparator ordering, + boolean reuseKeyValueInstances, + Configuration conf) throws IOException { + + iterators = Lists.newArrayList(); + /* + * we assume all files should exist, otherwise we will bail out. + */ + FileSystem fs = FileSystem.get(path[0].toUri(), conf); + FileStatus[] statuses = new FileStatus[path.length]; + for (int i = 0; i < statuses.length; i++) { + statuses[i] = fs.getFileStatus(path[i]); + } + init(statuses, ordering, reuseKeyValueInstances, conf); + } + + private void init(FileStatus[] statuses, + Comparator ordering, + final boolean reuseKeyValueInstances, + final Configuration conf) throws IOException { + + /* + * prevent NPEs. Unfortunately, Hadoop would return null for list if nothing + * was qualified. In this case, which is a corner case, we should assume an + * empty iterator, not an NPE. + */ + if (statuses == null) { + statuses = NO_STATUSES; + } + + if (ordering != null) { + Arrays.sort(statuses, ordering); + } + Iterator fileStatusIterator = Iterators.forArray(statuses); + + try { + + Iterator> fsIterators = + Iterators.transform(fileStatusIterator, + new Function>() { + @Override + public Iterator apply(FileStatus from) { + try { + SequenceFileValueIterator iterator = new SequenceFileValueIterator<>(from.getPath(), + reuseKeyValueInstances, conf); + iterators.add(iterator); + return iterator; + } catch (IOException ioe) { + throw new IllegalStateException(from.getPath().toString(), ioe); + } + } + }); + + Collections.reverse(iterators); // close later in reverse order + + delegate = Iterators.concat(fsIterators); + + } finally { + /* + * prevent file handle leaks in case one of handles fails to open. If some + * of the files fail to open, constructor will fail and close() will never + * be called. Thus, those handles that did open in constructor, would leak + * out, unless we specifically handle it here. + */ + IOUtils.close(iterators); + } + } + + @Override + protected Iterator delegate() { + return delegate; + } + + @Override + public void close() throws IOException { + IOUtils.close(iterators); + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterable.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterable.java b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterable.java new file mode 100644 index 0000000..f17c2a1 --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterable.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.iterator.sequencefile; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Writable; +import org.apache.mahout.common.Pair; + +/** + *

{@link Iterable} counterpart to {@link SequenceFileIterator}.

+ */ +public final class SequenceFileIterable implements Iterable> { + + private final Path path; + private final boolean reuseKeyValueInstances; + private final Configuration conf; + + /** + * Like {@link #SequenceFileIterable(Path, boolean, Configuration)} but key and value instances are not reused + * by default. + * + * @param path file to iterate over + */ + public SequenceFileIterable(Path path, Configuration conf) { + this(path, false, conf); + } + + /** + * @param path file to iterate over + * @param reuseKeyValueInstances if true, reuses instances of the key and value object instead of creating a new + * one for each read from the file + */ + public SequenceFileIterable(Path path, boolean reuseKeyValueInstances, Configuration conf) { + this.path = path; + this.reuseKeyValueInstances = reuseKeyValueInstances; + this.conf = conf; + } + + @Override + public Iterator> iterator() { + try { + return new SequenceFileIterator<>(path, reuseKeyValueInstances, conf); + } catch (IOException ioe) { + throw new IllegalStateException(path.toString(), ioe); + } + } + +} + http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterator.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterator.java b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterator.java new file mode 100644 index 0000000..bc5c549 --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterator.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.iterator.sequencefile; + +import java.io.Closeable; +import java.io.IOException; + +import com.google.common.collect.AbstractIterator; +import com.google.common.io.Closeables; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.mahout.common.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + *

{@link java.util.Iterator} over a {@link SequenceFile}'s keys and values, as a {@link Pair} + * containing key and value.

+ */ +public final class SequenceFileIterator + extends AbstractIterator> implements Closeable { + + private final SequenceFile.Reader reader; + private final Configuration conf; + private final Class keyClass; + private final Class valueClass; + private final boolean noValue; + private K key; + private V value; + private final boolean reuseKeyValueInstances; + + private static final Logger log = LoggerFactory.getLogger(SequenceFileIterator.class); + + /** + * @throws IOException if path can't be read, or its key or value class can't be instantiated + */ + + public SequenceFileIterator(Path path, boolean reuseKeyValueInstances, Configuration conf) throws IOException { + key = null; + value = null; + FileSystem fs = path.getFileSystem(conf); + path = path.makeQualified(fs); + reader = new SequenceFile.Reader(fs, path, conf); + this.conf = conf; + keyClass = (Class) reader.getKeyClass(); + valueClass = (Class) reader.getValueClass(); + noValue = NullWritable.class.equals(valueClass); + this.reuseKeyValueInstances = reuseKeyValueInstances; + } + + public Class getKeyClass() { + return keyClass; + } + + public Class getValueClass() { + return valueClass; + } + + @Override + public void close() throws IOException { + key = null; + value = null; + Closeables.close(reader, true); + + endOfData(); + } + + @Override + protected Pair computeNext() { + if (!reuseKeyValueInstances || value == null) { + key = ReflectionUtils.newInstance(keyClass, conf); + if (!noValue) { + value = ReflectionUtils.newInstance(valueClass, conf); + } + } + try { + boolean available; + if (noValue) { + available = reader.next(key); + } else { + available = reader.next(key, value); + } + if (!available) { + close(); + return null; + } + return new Pair<>(key, value); + } catch (IOException ioe) { + try { + close(); + } catch (IOException e) { + log.error(e.getMessage(), e); + } + throw new IllegalStateException(ioe); + } + } + +}