hadoop-common-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tarjei Huse <tar...@scanmine.com>
Subject Multiple inputs and Multiple outputs give IOException
Date Tue, 07 Feb 2012 14:06:02 GMT
Hi,

I'm writing an MR job that takes a set of SequenceFiles extracts a new
key, outputs the key + value to the reducer and then the reducer writes
the value to a set of sequence files based on the key.

This job works perfectly if I run it with one sequencefile, but fails if
I run it with more than one sequencefile. It then fails with this exception:
org.apache.hadoop.ipc.RemoteException: java.io.IOException: File
/user/me/part6/_temporary/_attempt_201202071305_0017_r_000000_2/2011-11-18-22-attempt_201202071305_0017_r_000000_2-r-00000
could only be replicated to 0 nodes, instead of 1
    at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:1520)
    at
org.apache.hadoop.hdfs.server.namenode.NameNode.addBlock(NameNode.java:665)
    at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
    at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:557)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1434)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1430)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1157)
    at


Relevant parts of the code:

The job:

public class ProtoDumpConverterJob extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new
ProtoDumpConverterJob(), args);
        System.exit(res);
    }

@Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        String[] remainingArgs = new GenericOptionsParser(getConf(),
args).getRemainingArgs();

        if (remainingArgs.length < 2) {
            System.err.println(String.format("Usage: hadoop jar
reindex.jar %s  <in> <out>", getClass().getName()));
            ToolRunner.printGenericCommandUsage(System.err);
            return 1;
        }
        String inputPath = remainingArgs[0];
        String outputPath = remainingArgs[remainingArgs.length-1];

        System.out.println("Options: " + Joiner.on(",
").join(remainingArgs));

        Job job = new Job(conf);
        job.setJarByClass(getClass());
        FileSystem fs = FileSystem.get(conf);

        job.setInputFormatClass(SequenceFileInputFormat.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(BytesWritable.class);
        job.setMapperClass(ProtoDumpConverterMapper.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        job.setReducerClass(ProtoDumpDateReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);
        job.setNumReduceTasks(1);
        SequenceFileInputFormat.setInputPaths(job, inputPath);
        FileOutputFormat.setOutputPath(job, new Path(outputPath));

        boolean res = job.waitForCompletion(conf.getBoolean("verbose",
true));

        return (res) ? 0 : 2;
    }
}

The mapper:

public class ProtoDumpConverterMapper extends Mapper<LongWritable,
ContentItemProto, Text, BytesWritable> {
    private static Logger log =
Logger.getLogger(ProtoDumpConverterMapper.class);
    private static DateFormat dateFormat = new
SimpleDateFormat("yyyy-MM-dd-HH");
    @Override
    public void setup(Context context) throws IOException,
InterruptedException {

    }

    @Override
    public void map(LongWritable id, ContentItemProto contentItemProto,
Context context) throws IOException, InterruptedException {
        byte[] data =
ContentItemUtils.uncompressItem(contentItemProto.get_item().getData());
        try {
            ContentItem item = ContentItem.parseFramed(data);
            Text date = new
Text(dateFormat.format(item.getDateFound()));          
            context.write(date, new BytesWritable(data));

        } catch (InvalidProtocolBufferException e) {
            log.error("Error parsing: " +
contentItemProto.get_contentobject_id() + " " + id.get());
        }
    }
}

And the reducer:
public class ProtoDumpDateReducer extends Reducer<Text, BytesWritable,
Text, BytesWritable> {
    private static final Logger log =
Logger.getLogger(ProtoDumpDateReducer.class);
    private MultipleOutputs<Text,BytesWritable> mos;

    private Path outputPath;

    @Override
    public void setup(Context context) {
        mos = new MultipleOutputs<Text,BytesWritable>(context);
        outputPath = FileOutputFormat.getOutputPath(context);
        log.info("Writing to path: " + outputPath);
    }
    @Override
    public void reduce(Text key,Iterable<BytesWritable> values, Context
context) throws IOException, InterruptedException {
        for (BytesWritable value : values) {
            String fileName = key.toString() + "-" +
context.getTaskAttemptID();
            String baseOutputPath = outputPath.getName() + "/" +fileName;
            log.info(String.format("Writing to %s - %s", key,
baseOutputPath));
            mos.write(key, value, fileName);
        }
    }

    @Override
    public void cleanup(Context context) throws IOException,
InterruptedException{
        log.info("Close called");
        mos.close();
    }
}


Any tips or ideas on why this fails? A simmilar job using the old api
and the /MultipleSequenceFileOutputFormat fails in the same way.
/

-- 
Regards / Med vennlig hilsen
Tarjei Huse
Mobil: 920 63 413


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message