hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Denis Mone <monede...@gmail.com>
Subject Code does not enter mapper, reducer class
Date Sat, 17 Sep 2016 10:56:46 GMT
Hello hadoop users.

     I am trying to implement a mapreduce KMeans algorithm using hadoop. 
The problem i have is that the code does not enter the map and reduce 
class. I'm running the application from Intellij Idea not using hadoop 
binary.

The rest of the email is a sample of my code. If someone can see 
something that could help that would be greatly appreciated.

Thanks in advance.

Here is my driver code:

Job job = Job.getInstance(conf); job.setJobName("kmeans"); job.setJarByClass(KMeans.class);
FileInputFormat.addInputPath(job, input); FileOutputFormat.setOutputPath(job, output); job.setMapperClass(KMeansMapper.class);
job.setReducerClass(KMeansReducer.class);job.setMapOutputKeyClass(PointVector.class); job.setMapOutputValueClass(PointVector.class);
job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);job.waitForCompletion(true);

And below are my map and reduce classes:

public class KMeansMapperextends Mapper<LongWritable, Text, PointVector, PointVector>
{

     private int clusters; private List<ImmutableTriple<Integer, String, PointVector>>centers;
@Override protected void setup(Context context)throws IOException, InterruptedException {
         System.out.println("Inside setup"); this.clusters = Integer.valueOf(context.getConfiguration().get("clusters"));
this.centers =new ArrayList<>(); BufferedReader br =new BufferedReader(new FileReader("/home/denis/centers"));
for(int i =0; i <clusters; i++) {
             centers.add(DocumentRecordParser.parse(br.readLine())); }
         br.close(); }

     @Override public void map(LongWritable key, Text value, Context context)throws IOException,
InterruptedException {
         PointVector line = DocumentRecordParser.returnPointVector(value.toString()); System.out.println("Inside
map!"); double minDist = Double.MAX_VALUE; double dist;PointVector index =null; EuclideanDistance
ed =new EuclideanDistance(); for (ImmutableTriple<Integer, String, PointVector> c :centers)
{
             dist = ed.compute(line.points(), c.right.points()); if (dist < minDist) {
                 minDist = dist; index = c.right; }
         }

         context.write(index, line); }
}

public class KMeansReducerextends Reducer<PointVector, PointVector, Text, Text> {
     private double min_dist = Double.MAX_VALUE; @Override public void reduce(PointVector
center, Iterable<PointVector> points, Context context)throws IOException, InterruptedException
{
         EuclideanDistance measure =new EuclideanDistance(); double distance =0.0; int numOfPoints
=0;double diff =0.0; PointVector newCenter =null; double [] sums =new double[center.size()];
for (PointVector p : points) {
             distance += measure.compute(center.points(), p.points()); if (distance <min_dist)
{
                 min_dist = distance; newCenter = p; }
             numOfPoints++; sums = MathArrays.ebeAdd(p.points(), sums);}
         for (int i =0; i < sums.length; i++) {
             sums[i] = sums[i] / numOfPoints; }System.out.println("Old center " + center +"
new center: " + newCenter); context.write(new Text(newCenter.toString()), new Text(new PointVector(sums).toString()));
}
}

Last but not least my custom data structure class PointVector

public class PointVectorimplements WritableComparable<PointVector> {
     /** * Keep the tfIdf values of the terms of a document */ private Vector<Double>data
=new Vector<>(); public PointVector(double [] values) {
         this.data =new Vector<>(values.length); this.data.addAll(Doubles.asList(values));
}

     public PointVector(List<Double> values) {
         this.data =new Vector<>(values.size()); this.data.addAll(values); }

     public PointVector(String [] values) {
         this.data =new Vector<>(values.length); for (String s: values) {
             this.data.add(Double.valueOf(s)); }
     }

     public PointVector() {
         this.data =new Vector<>(); }

     public double[]points() {
         return Doubles.toArray(data); }

     /** * Subtract the values of this vector from the PointVector passed as 
argument * @param subtracted * @return */ public PointVectorsub(PointVector subtracted) {
         int N =this.data.size(); double [] vals =new double[N]; for (int i =0; i < N;
i++) {
             vals[i] =this.data.get(i) - subtracted.get(i); }
         return new PointVector(vals); }

     public PointVectoradd(PointVector vec) {
         int N =this.data.size(); double [] vals =new double[N]; for (int i =0; i < N;
i++) {
             vals[i] =this.data.get(i) + vec.get(i); }
         return new PointVector(vals); }

     /** * Compute the dot product of this vector with the one passed as 
argument * @param vector * @return */ public double dotProduct(PointVector vector) {
         int N =this.data.size(); double sum =0.0; for (int i =0; i < N; i++) {
             sum +=this.data.get(i) * vector.get(i); }
         return sum; }

     @Override public int compareTo(PointVector pointVector) {
         return 0; }

     @Override public void write(DataOutput dataOutput)throws IOException {
         dataOutput.writeInt(data.size()); for (double d :data) {
             dataOutput.writeDouble(d); }
     }

     @Override public void readFields(DataInput dataInput)throws IOException {
         int s = dataInput.readInt(); // read the size of the vector }

     public Doubleget(int i) {
         return this.data.get(i); }

     public int size() {
         return this.data.size(); }

     @Override public StringtoString() {
         if (data.isEmpty()) {
             return "[]"; }
         StringBuilder sb =new StringBuilder(); sb.append("["); for (double d :data) {
             sb.append(d); sb.append(", "); }
         final int pos = sb.lastIndexOf(","); sb.delete(pos, pos +1); sb.append("]"); return
sb.toString(); }
}


Mime
View raw message