hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Azuryy Yu <azury...@gmail.com>
Subject Re: Missing records from HDFS
Date Sat, 23 Nov 2013 03:30:28 GMT
There is problem in the 'initialize', generally, we cannot think
split.start as the real start, because FileSplit cannot split on the end of
the line accurately, so  you need to adjust the start in the 'initialize'
to the start of one line if start is not equal to '0'.

also, end = start + split.length, this is not a real end, because it maybe
not locate the end of the line.

so the Reader MUST adjust the real start and the end in the 'initialize'.
otherwise, maybe miss some records.

 Sure,

 our FileInputFormat implementation:

  public class CVSInputFormat extends

        FileInputFormat<FileValidatorDescriptor, Text> {


     /*

     * (non-Javadoc)

     *

     * @see

     * org.apache.hadoop.mapreduce.InputFormat#createRecordReader(org.apache

     * .hadoop.mapreduce.InputSplit,

     * org.apache.hadoop.mapreduce.TaskAttemptContext)

     */

    @Override

    public RecordReader<FileValidatorDescriptor, Text> createRecordReader(

            InputSplit split, TaskAttemptContext context) {

        String delimiter = context.getConfiguration().get(

                "textinputformat.record.delimiter");

        byte[] recordDelimiterBytes = null;

        if (null != delimiter)

            recordDelimiterBytes = delimiter.getBytes();

        return new CVSLineRecordReader(recordDelimiterBytes);

    }


     /*

     * (non-Javadoc)

     *

     * @see

     * org.apache.hadoop.mapreduce.lib.input.FileInputFormat#isSplitable(org

     * .apache.hadoop.mapreduce.JobContext, org.apache.hadoop.fs.Path)

     */

    @Override

    protected boolean isSplitable(JobContext context, Path file) {

        CompressionCodec codec = new CompressionCodecFactory(

                context.getConfiguration()).getCodec(file);

        return codec == null;

    }

}


 the recordReader:



 public class CVSLineRecordReader extends

        RecordReader<FileValidatorDescriptor, Text> {

    private static final Log LOG =
LogFactory.getLog(CVSLineRecordReader.class);


     public static final String CVS_FIRST_LINE = "file.first.line";


     private long start;

    private long pos;

    private long end;

    private LineReader in;

    private int maxLineLength;

    private FileValidatorDescriptor key = null;

    private Text value = null;

    private Text data = null;

    private byte[] recordDelimiterBytes;


     public CVSLineRecordReader(byte[] recordDelimiter) {

        this.recordDelimiterBytes = recordDelimiter;

    }


     @Override

    public void initialize(InputSplit genericSplit, TaskAttemptContext
context)

            throws IOException {

        Properties properties = new Properties();

        Configuration configuration = context.getConfiguration();


         Path[] cacheFiles = DistributedCache.getLocalCacheFiles(context

                .getConfiguration());

        for (Path cacheFile : cacheFiles) {

            if (cacheFile.toString().endsWith(

                    context.getConfiguration().get(VALIDATOR_CONF_PATH))) {

                properties.load(new FileReader(cacheFile.toString()));

            }

        }


         FileSplit split = (FileSplit) genericSplit;

        Configuration job = context.getConfiguration();

        this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",

                Integer.MAX_VALUE);

        start = split.getStart();

        end = start + split.getLength();

        pos = start;

        final Path file = split.getPath();


         // open the file and seek to the start of the split

        FileSystem fs = file.getFileSystem(job);

        FSDataInputStream fileIn = fs.open(split.getPath());


         this.in = generateReader(fileIn, job);


         // if CVS_FIRST_LINE does not exist in conf then the csv file
first line

        // is the header

        if (properties.containsKey(CVS_FIRST_LINE)) {

            configuration.set(CVS_FIRST_LINE, properties.get(CVS_FIRST_LINE)

                    .toString());

        } else {

            readData();

            configuration.set(CVS_FIRST_LINE, data.toString());

            if (start != 0) {

                fileIn.seek(start);

                in = generateReader(fileIn, job);

                pos = start;

            }

        }


         key = new FileValidatorDescriptor();

        key.setFileName(split.getPath().getName());

        context.getConfiguration().set("file.name", key.getFileName());


     }


     @Override

    public boolean nextKeyValue() throws IOException {

        int newSize = readData();

        if (newSize == 0) {

            key = null;

            value = null;

            return false;

        } else {

            key.setOffset(this.pos);

            value = data;

            return true;

        }

    }


     private LineReader generateReader(FSDataInputStream fileIn,

            Configuration job) throws IOException {

        if (null == this.recordDelimiterBytes) {

            return new LineReader(fileIn, job);

        } else {

            return new LineReader(fileIn, job, this.recordDelimiterBytes);

        }


     }


     private int readData() throws IOException {

        if (data == null) {

            data = new Text();

        }

        int newSize = 0;

        while (pos < end) {

            newSize = in.readLine(data, maxLineLength,

                    Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),

                            maxLineLength));

            if (newSize == 0) {

                break;

            }

            pos += newSize;

            if (newSize < maxLineLength) {

                break;

            }


             // line too long. try again

            LOG.info("Skipped line of size " + newSize + " at pos "

                    + (pos - newSize));

        }

        return newSize;

    }


     @Override

    public FileValidatorDescriptor getCurrentKey() {

        return key;

    }


     @Override

    public Text getCurrentValue() {

        return value;

    }


     @Override

    public float getProgress() {

        if (start == end) {

            return 0.0f;

        } else {

            return Math.min(1.0f, (pos - start) / (float) (end - start));

        }

    }


     @Override

    public synchronized void close() throws IOException {

        if (in != null) {

            in.close();

        }

    }

}

 Thanks.


  De: Azuryy Yu <azuryyyu@gmail.com>
Responder a: "user@hadoop.apache.org" <user@hadoop.apache.org>
Fecha: viernes, 22 de noviembre de 2013 12:19
Para: "user@hadoop.apache.org" <user@hadoop.apache.org>
Asunto: Re: Missing records from HDFS

  I do think this is because of your RecorderReader, can you paste your
code here? and give a piece of data example.

 please use pastebin if you want.


On Fri, Nov 22, 2013 at 7:16 PM, ZORAIDA HIDALGO SANCHEZ <zoraida@tid.es>wrote:

>  One more thing,
>
>  if we split the files then all the records are processed. Files are
> of 70,5MB.
>
>  Thanks,
>
>  Zoraida.-
>
>   De: zoraida <zoraida@tid.es>
> Fecha: viernes, 22 de noviembre de 2013 08:59
>
> Para: "user@hadoop.apache.org" <user@hadoop.apache.org>
> Asunto: Re: Missing records from HDFS
>
>   Thanks for your response Azuryy.
>
>  My hadoop version: 2.0.0-cdh4.3.0
> InputFormat: a custom class that extends from FileInputFormat(csv input
> format)
> These fiels are under the same directory, different files.
> My input path is configured using oozie throughout the propertie
> mapred.input.dir.
>
>
>  Same code and input running on Hadoop 2.0.0-cdh4.2.1 works fine. Does
> not discard any record.
>
>  Thanks.
>
>   De: Azuryy Yu <azuryyyu@gmail.com>
> Responder a: "user@hadoop.apache.org" <user@hadoop.apache.org>
> Fecha: jueves, 21 de noviembre de 2013 07:31
> Para: "user@hadoop.apache.org" <user@hadoop.apache.org>
> Asunto: Re: Missing records from HDFS
>
>   what's your hadoop version? and which InputFormat are you used?
>
>  these files under one directory or there are lots of subdirectory? how
> ddi you configure input path in your main?
>
>
>
> On Thu, Nov 21, 2013 at 12:25 AM, ZORAIDA HIDALGO SANCHEZ <zoraida@tid.es>wrote:
>
>>  Hi all,
>>
>>  my job is not reading all the input records. In the input directory I
>> have a set of files containing a total of 6000000 records but only 5997000
>> are processed. The Map Input Records counter says 5997000.
>> I have tried downloading the files with a getmerge to check how many
>> records would return but the correct number is returned(6000000).
>>
>>  Do you have any suggestion?
>>
>>  Thanks.
>>
>> ------------------------------
>>
>> Este mensaje se dirige exclusivamente a su destinatario. Puede consultar
>> nuestra política de envío y recepción de correo electrónico en el enlace
>> situado más abajo.
>> This message is intended exclusively for its addressee. We only send and
>> receive email on the basis of the terms set out at:
>> http://www.tid.es/ES/PAGINAS/disclaimer.aspx
>>
>
>
> ------------------------------
>
> Este mensaje se dirige exclusivamente a su destinatario. Puede consultar
> nuestra política de envío y recepción de correo electrónico en el enlace
> situado más abajo.
> This message is intended exclusively for its addressee. We only send and
> receive email on the basis of the terms set out at:
> http://www.tid.es/ES/PAGINAS/disclaimer.aspx
>


------------------------------

Este mensaje se dirige exclusivamente a su destinatario. Puede consultar
nuestra política de envío y recepción de correo electrónico en el enlace
situado más abajo.
This message is intended exclusively for its addressee. We only send and
receive email on the basis of the terms set out at:
http://www.tid.es/ES/PAGINAS/disclaimer.aspx

Mime
View raw message