hadoop-common-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Denis Mone <monede...@gmail.com>
Subject Re: Code does not enter mapper, reducer class
Date Sun, 18 Sep 2016 18:17:30 GMT
The imports are from the mapreduce package. Also i use maven for 
dependencies and the version of hadoop is 2.7.1


On 09/18/2016 09:11 PM, Dieter De Witte wrote:
> Maybe also add the list of imports you are doing, they are different 
> between different versions of hadoop and mixing them might cause 
> counterintuitive behaviour...
>
> Kind Regards,
>
> Dieter De Witte
> Big Data Scientist
> iMinds - Data Science Lab <http://www.datasciencelab.ugent.be/>
> Ghent University
>
> 2016-09-18 19:54 GMT+02:00 Denis Mone <monedenis@gmail.com 
> <mailto:monedenis@gmail.com>>:
>
>     Hello and thanks for your time.
>         What i mean is that i have setup breakpoints in my code in the
>     map and reduce functions
>         and the the breakpoint is not activated when the program
>     starts running (hence the title code does not enter class, which
>     is not that informative really).
>      As for the jobs, there is only one, that of kmeans algorithm and
>     is being executed correctly no exception thrown.
>     Here
>     <https://gist.github.com/Denis1990/9f42f73ae126ea47008ee907da64ac6a>
>     is the output of the job.
>      The driver class is this
>     <https://gist.github.com/Denis1990/2978c92567efa17060bb618f66d02ce0>
>
>     09/18/2016 02:27 AM, daemeon reiydelle wrote:
>>     What do you mean by "does not enter ... class(es)"?
>>
>>     Does the log show that the scheduler ever accepts the job (You
>>     may have to turn logging up)? Are "other" jobs that are submitted
>>     to the same class under your user scheduled & executed? Wonder
>>     about which scheduler? What is the definition for the scheduler
>>     class? Is it getting to a container? Let's get a complete history
>>     of the steps you are getting please?
>>
>>     ***
>>     .......**
>>
>>     Daemeon C.M. Reiydelle
>>     USA (+1) 415.501.0198 <tel:%28%2B1%29%20415.501.0198>
>>     London (+44) (0) 20 8144 9872
>>     <tel:%28%2B44%29%20%280%29%2020%208144%209872>*/
>>     /
>>
>>     On Sat, Sep 17, 2016 at 3:56 AM, Denis Mone <monedenis@gmail.com
>>     <mailto:monedenis@gmail.com>> wrote:
>>
>>         Hello hadoop users.
>>
>>             I am trying to implement a mapreduce KMeans algorithm
>>         using hadoop. The problem i have is that the code does not
>>         enter the map and reduce class. I'm running the application
>>         from Intellij Idea not using hadoop binary.
>>
>>         The rest of the email is a sample of my code. If someone can
>>         see something that could help that would be greatly appreciated.
>>
>>         Thanks in advance.
>>
>>         Here is my driver code:
>>
>>         Job job = Job.getInstance(conf); job.setJobName("kmeans"); job.setJarByClass(KMeans.class);
FileInputFormat.addInputPath(job, input); FileOutputFormat.setOutputPath(job, output); job.setMapperClass(KMeansMapper.class);
job.setReducerClass(KMeansReducer.class);job.setMapOutputKeyClass(PointVector.class); job.setMapOutputValueClass(PointVector.class);
job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);job.waitForCompletion(true);
>>
>>         And below are my map and reduce classes:
>>
>>         public class KMeansMapperextends Mapper<LongWritable, Text, PointVector,
PointVector> {
>>
>>              private int clusters; private List<ImmutableTriple<Integer, String,
PointVector>>centers; @Override protected void setup(Context context)throws IOException,
InterruptedException {
>>                  System.out.println("Inside setup"); this.clusters = Integer.valueOf(context.getConfiguration().get("clusters"));
this.centers =new ArrayList<>(); BufferedReader br =new BufferedReader(new FileReader("/home/denis/centers"));
for(int i =0; i <clusters; i++) {
>>                      centers.add(DocumentRecordParser.parse(br.readLine())); }
>>                  br.close(); }
>>
>>              @Override public void map(LongWritable key, Text value, Context context)throws
IOException, InterruptedException {
>>                  PointVector line = DocumentRecordParser.returnPointVector(value.toString());
System.out.println("Inside map!"); double minDist = Double.MAX_VALUE; double dist;PointVector
index =null; EuclideanDistance ed =new EuclideanDistance(); for (ImmutableTriple<Integer,
String, PointVector> c :centers) {
>>                      dist = ed.compute(line.points(), c.right.points()); if (dist
< minDist) {
>>                          minDist = dist; index = c.right; }
>>                  }
>>
>>                  context.write(index, line); }
>>         }
>>
>>         public class KMeansReducerextends Reducer<PointVector, PointVector, Text,
Text> {
>>              private double min_dist = Double.MAX_VALUE; @Override public void reduce(PointVector
center, Iterable<PointVector> points, Context context)throws IOException, InterruptedException
{
>>                  EuclideanDistance measure =new EuclideanDistance(); double distance
=0.0; int numOfPoints =0;double diff =0.0; PointVector newCenter =null; double [] sums =new
double[center.size()]; for (PointVector p : points) {
>>                      distance += measure.compute(center.points(), p.points()); if
(distance <min_dist) {
>>                          min_dist = distance; newCenter = p; }
>>                      numOfPoints++; sums = MathArrays.ebeAdd(p.points(), sums);}
>>                  for (int i =0; i < sums.length; i++) {
>>                      sums[i] = sums[i] / numOfPoints; }System.out.println("Old center
" + center +" new center: " + newCenter); context.write(new Text(newCenter.toString()), new
Text(new PointVector(sums).toString())); }
>>         }
>>
>>         Last but not least my custom data structure class PointVector
>>
>>         public class PointVectorimplements WritableComparable<PointVector>
{
>>              /** * Keep the tfIdf values of the terms of a document */
>>         private Vector<Double>data =new Vector<>(); public PointVector(double
[] values) {
>>                  this.data =new Vector<>(values.length); this.data.addAll(Doubles.asList(values));
}
>>
>>              public PointVector(List<Double> values) {
>>                  this.data =new Vector<>(values.size()); this.data.addAll(values);
}
>>
>>              public PointVector(String [] values) {
>>                  this.data =new Vector<>(values.length); for (String s: values)
{
>>                      this.data.add(Double.valueOf(s)); }
>>              }
>>
>>              public PointVector() {
>>                  this.data =new Vector<>(); }
>>
>>              public double[]points() {
>>                  return Doubles.toArray(data); }
>>
>>              /** * Subtract the values of this vector from the PointVector
>>         passed as argument * @param subtracted * @return */ public PointVectorsub(PointVector
subtracted) {
>>                  int N =this.data.size(); double [] vals =new double[N]; for (int
i =0; i < N; i++) {
>>                      vals[i] =this.data.get(i) - subtracted.get(i); }
>>                  return new PointVector(vals); }
>>
>>              public PointVectoradd(PointVector vec) {
>>                  int N =this.data.size(); double [] vals =new double[N]; for (int
i =0; i < N; i++) {
>>                      vals[i] =this.data.get(i) + vec.get(i); }
>>                  return new PointVector(vals); }
>>
>>              /** * Compute the dot product of this vector with the one
>>         passed as argument * @param vector * @return */ public double
>>         dotProduct(PointVector vector) {
>>                  int N =this.data.size(); double sum =0.0; for (int i =0; i <
N; i++) {
>>                      sum +=this.data.get(i) * vector.get(i); }
>>                  return sum; }
>>
>>              @Override public int compareTo(PointVector pointVector) {
>>                  return 0; }
>>
>>              @Override public void write(DataOutput dataOutput)throws IOException
{
>>                  dataOutput.writeInt(data.size()); for (double d :data) {
>>                      dataOutput.writeDouble(d); }
>>              }
>>
>>              @Override public void readFields(DataInput dataInput)throws IOException
{
>>                  int s = dataInput.readInt(); // read the size of the vector }
>>
>>              public Doubleget(int i) {
>>                  return this.data.get(i); }
>>
>>              public int size() {
>>                  return this.data.size(); }
>>
>>              @Override public StringtoString() {
>>                  if (data.isEmpty()) {
>>                      return "[]"; }
>>                  StringBuilder sb =new StringBuilder(); sb.append("["); for (double
d :data) {
>>                      sb.append(d); sb.append(", "); }
>>                  final int pos = sb.lastIndexOf(","); sb.delete(pos, pos +1); sb.append("]");
return sb.toString(); }
>>         }
>>

Mime
View raw message