avro-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xiaosong niu <xs...@yahoo.com>
Subject reducer not runnig
Date Thu, 09 Feb 2012 01:44:13 GMT
Hi 


I am pretty new to Avro, and try to write an map-reduce job to read 

text format and write to Avro. it is a simple word count I write based 

on the example, but the reducer seems not running as I wrote. all the output
is directly from map actually. basically when I check the log, the system.out
in the reducer did not show up even in the log.


here the code, can someone give a help?

import java.io.IOException;

import org.apache.avro.Schema;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.avro.mapred.AvroCollector;
import org.apache.avro.mapred.AvroJob;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroMapper;
import org.apache.avro.mapred.AvroReducer;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapred.Pair;


public class AvroTest extends Configured implements Tool{

   public static class Map extends MapReduceBase implements
    Mapper<LongWritable, Text, AvroKey<Utf8>, AvroValue<Long>> {

        @Override
        public void map(LongWritable key, Text value,
                OutputCollector<AvroKey<Utf8>, AvroValue<Long>>
output,
                Reporter arg3) throws IOException {
            // TODO Auto-generated method stub

            String valueLine = value.toString();
            String[] values = valueLine.split("=");
            System.out.println("key is "+values[0].toString()+" value is 1");
            output.collect(new AvroKey(new Utf8(values[0])), new AvroValue(
                    new Long(1)));
            

        }

    }
    
    public static class Reduce extends AvroReducer {
        public void reduce(Utf8 word, Iterable<Long> counts, AvroCollector<Pair<Utf8,Long>>
output, Reporter reporter) throws IOException {

            long sum = 0l;
            for (long count:counts) {
                sum++;
            }
            System.out.println("key is "+word.toString()+" value is " +sum);
            Pair<Utf8,Long> outputValue = new Pair<Utf8,Long> (word.toString(),sum);
        
            output.collect(outputValue);
        }
    }

    public int run(String[] args) throws Exception {
           Configuration passed_conf = getConf();
        JobConf conf = new JobConf(passed_conf,AvroTest.class);
        conf.setJobName("Avro Test");
        conf.setInputFormat(TextInputFormat.class);
        
        conf.setMapperClass(Map.class);
        AvroJob.setOutputSchema(conf, new Pair(new Utf8(""), 0L).getSchema());
        AvroJob.setReducerClass(conf, Reduce.class);

        FileInputFormat.setInputPaths(conf, new Path(new String("/user/sam/test_input*")));
        FileOutputFormat.setOutputPath(conf, new Path(new String("/user/sam/avro_output")));

        JobClient.runJob(conf);
        return 0;


    }

    public static void main(String[] args) throws Exception {

        GenericOptionsParser optParser = new GenericOptionsParser(args);
        int exitCode = ToolRunner.run(optParser.getConfiguration(), new AvroTest(),
optParser.getRemainingArgs());               
        System.exit(exitCode);

    }

}

Thank 

Sam

Mime
View raw message