hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Adam Kawa <kawa.a...@gmail.com>
Subject Re: Missing records from HDFS
Date Thu, 28 Nov 2013 23:15:43 GMT
You can find an example/inspiration in the *initialize(InputSplit
genericSplit, TaskAttemptContext context)* method in the
*org.apache.hadoop.mapreduce.lib.input.LineRecordReader*


2013/11/23 Azuryy Yu <azuryyyu@gmail.com>

> There is problem in the 'initialize', generally, we cannot think
> split.start as the real start, because FileSplit cannot split on the end of
> the line accurately, so  you need to adjust the start in the 'initialize'
> to the start of one line if start is not equal to '0'.
>
> also, end = start + split.length, this is not a real end, because it maybe
> not locate the end of the line.
>
> so the Reader MUST adjust the real start and the end in the 'initialize'.
> otherwise, maybe miss some records.
>
>  Sure,
>
>  our FileInputFormat implementation:
>
>   public class CVSInputFormat extends
>
>         FileInputFormat<FileValidatorDescriptor, Text> {
>
>
>      /*
>
>      * (non-Javadoc)
>
>      *
>
>      * @see
>
>      *
> org.apache.hadoop.mapreduce.InputFormat#createRecordReader(org.apache
>
>      * .hadoop.mapreduce.InputSplit,
>
>      * org.apache.hadoop.mapreduce.TaskAttemptContext)
>
>      */
>
>     @Override
>
>     public RecordReader<FileValidatorDescriptor, Text> createRecordReader(
>
>             InputSplit split, TaskAttemptContext context) {
>
>         String delimiter = context.getConfiguration().get(
>
>                 "textinputformat.record.delimiter");
>
>         byte[] recordDelimiterBytes = null;
>
>         if (null != delimiter)
>
>             recordDelimiterBytes = delimiter.getBytes();
>
>         return new CVSLineRecordReader(recordDelimiterBytes);
>
>     }
>
>
>      /*
>
>      * (non-Javadoc)
>
>      *
>
>      * @see
>
>      *
> org.apache.hadoop.mapreduce.lib.input.FileInputFormat#isSplitable(org
>
>      * .apache.hadoop.mapreduce.JobContext, org.apache.hadoop.fs.Path)
>
>      */
>
>     @Override
>
>     protected boolean isSplitable(JobContext context, Path file) {
>
>         CompressionCodec codec = new CompressionCodecFactory(
>
>                 context.getConfiguration()).getCodec(file);
>
>         return codec == null;
>
>     }
>
> }
>
>
>  the recordReader:
>
>
>
>  public class CVSLineRecordReader extends
>
>         RecordReader<FileValidatorDescriptor, Text> {
>
>     private static final Log LOG =
> LogFactory.getLog(CVSLineRecordReader.class);
>
>
>      public static final String CVS_FIRST_LINE = "file.first.line";
>
>
>      private long start;
>
>     private long pos;
>
>     private long end;
>
>     private LineReader in;
>
>     private int maxLineLength;
>
>     private FileValidatorDescriptor key = null;
>
>     private Text value = null;
>
>     private Text data = null;
>
>     private byte[] recordDelimiterBytes;
>
>
>      public CVSLineRecordReader(byte[] recordDelimiter) {
>
>         this.recordDelimiterBytes = recordDelimiter;
>
>     }
>
>
>      @Override
>
>     public void initialize(InputSplit genericSplit, TaskAttemptContext
> context)
>
>             throws IOException {
>
>         Properties properties = new Properties();
>
>         Configuration configuration = context.getConfiguration();
>
>
>          Path[] cacheFiles = DistributedCache.getLocalCacheFiles(context
>
>                 .getConfiguration());
>
>         for (Path cacheFile : cacheFiles) {
>
>             if (cacheFile.toString().endsWith(
>
>                     context.getConfiguration().get(VALIDATOR_CONF_PATH))) {
>
>                 properties.load(new FileReader(cacheFile.toString()));
>
>             }
>
>         }
>
>
>          FileSplit split = (FileSplit) genericSplit;
>
>         Configuration job = context.getConfiguration();
>
>         this.maxLineLength =
> job.getInt("mapred.linerecordreader.maxlength",
>
>                 Integer.MAX_VALUE);
>
>         start = split.getStart();
>
>         end = start + split.getLength();
>
>         pos = start;
>
>         final Path file = split.getPath();
>
>
>          // open the file and seek to the start of the split
>
>         FileSystem fs = file.getFileSystem(job);
>
>         FSDataInputStream fileIn = fs.open(split.getPath());
>
>
>          this.in = generateReader(fileIn, job);
>
>
>          // if CVS_FIRST_LINE does not exist in conf then the csv file
> first line
>
>         // is the header
>
>         if (properties.containsKey(CVS_FIRST_LINE)) {
>
>             configuration.set(CVS_FIRST_LINE,
> properties.get(CVS_FIRST_LINE)
>
>                     .toString());
>
>         } else {
>
>             readData();
>
>             configuration.set(CVS_FIRST_LINE, data.toString());
>
>             if (start != 0) {
>
>                 fileIn.seek(start);
>
>                 in = generateReader(fileIn, job);
>
>                 pos = start;
>
>             }
>
>         }
>
>
>          key = new FileValidatorDescriptor();
>
>         key.setFileName(split.getPath().getName());
>
>         context.getConfiguration().set("file.name", key.getFileName());
>
>
>      }
>
>
>      @Override
>
>     public boolean nextKeyValue() throws IOException {
>
>         int newSize = readData();
>
>         if (newSize == 0) {
>
>             key = null;
>
>             value = null;
>
>             return false;
>
>         } else {
>
>             key.setOffset(this.pos);
>
>             value = data;
>
>             return true;
>
>         }
>
>     }
>
>
>      private LineReader generateReader(FSDataInputStream fileIn,
>
>             Configuration job) throws IOException {
>
>         if (null == this.recordDelimiterBytes) {
>
>             return new LineReader(fileIn, job);
>
>         } else {
>
>             return new LineReader(fileIn, job, this.recordDelimiterBytes);
>
>         }
>
>
>      }
>
>
>      private int readData() throws IOException {
>
>         if (data == null) {
>
>             data = new Text();
>
>         }
>
>         int newSize = 0;
>
>         while (pos < end) {
>
>             newSize = in.readLine(data, maxLineLength,
>
>                     Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),
>
>                             maxLineLength));
>
>             if (newSize == 0) {
>
>                 break;
>
>             }
>
>             pos += newSize;
>
>             if (newSize < maxLineLength) {
>
>                 break;
>
>             }
>
>
>              // line too long. try again
>
>             LOG.info("Skipped line of size " + newSize + " at pos "
>
>                     + (pos - newSize));
>
>         }
>
>         return newSize;
>
>     }
>
>
>      @Override
>
>     public FileValidatorDescriptor getCurrentKey() {
>
>         return key;
>
>     }
>
>
>      @Override
>
>     public Text getCurrentValue() {
>
>         return value;
>
>     }
>
>
>      @Override
>
>     public float getProgress() {
>
>         if (start == end) {
>
>             return 0.0f;
>
>         } else {
>
>             return Math.min(1.0f, (pos - start) / (float) (end - start));
>
>         }
>
>     }
>
>
>      @Override
>
>     public synchronized void close() throws IOException {
>
>         if (in != null) {
>
>             in.close();
>
>         }
>
>     }
>
> }
>
>  Thanks.
>
>
>   De: Azuryy Yu <azuryyyu@gmail.com>
> Responder a: "user@hadoop.apache.org" <user@hadoop.apache.org>
> Fecha: viernes, 22 de noviembre de 2013 12:19
> Para: "user@hadoop.apache.org" <user@hadoop.apache.org>
> Asunto: Re: Missing records from HDFS
>
>   I do think this is because of your RecorderReader, can you paste your
> code here? and give a piece of data example.
>
>  please use pastebin if you want.
>
>
> On Fri, Nov 22, 2013 at 7:16 PM, ZORAIDA HIDALGO SANCHEZ <zoraida@tid.es>wrote:
>
>>  One more thing,
>>
>>  if we split the files then all the records are processed. Files are
>> of 70,5MB.
>>
>>  Thanks,
>>
>>  Zoraida.-
>>
>>   De: zoraida <zoraida@tid.es>
>> Fecha: viernes, 22 de noviembre de 2013 08:59
>>
>> Para: "user@hadoop.apache.org" <user@hadoop.apache.org>
>> Asunto: Re: Missing records from HDFS
>>
>>   Thanks for your response Azuryy.
>>
>>  My hadoop version: 2.0.0-cdh4.3.0
>> InputFormat: a custom class that extends from FileInputFormat(csv input
>> format)
>> These fiels are under the same directory, different files.
>> My input path is configured using oozie throughout the propertie
>> mapred.input.dir.
>>
>>
>>  Same code and input running on Hadoop 2.0.0-cdh4.2.1 works fine. Does
>> not discard any record.
>>
>>  Thanks.
>>
>>   De: Azuryy Yu <azuryyyu@gmail.com>
>> Responder a: "user@hadoop.apache.org" <user@hadoop.apache.org>
>> Fecha: jueves, 21 de noviembre de 2013 07:31
>> Para: "user@hadoop.apache.org" <user@hadoop.apache.org>
>> Asunto: Re: Missing records from HDFS
>>
>>   what's your hadoop version? and which InputFormat are you used?
>>
>>  these files under one directory or there are lots of subdirectory? how
>> ddi you configure input path in your main?
>>
>>
>>
>> On Thu, Nov 21, 2013 at 12:25 AM, ZORAIDA HIDALGO SANCHEZ <zoraida@tid.es
>> > wrote:
>>
>>>  Hi all,
>>>
>>>  my job is not reading all the input records. In the input directory I
>>> have a set of files containing a total of 6000000 records but only
>>> 5997000 are processed. The Map Input Records counter says 5997000.
>>> I have tried downloading the files with a getmerge to check how many
>>> records would return but the correct number is returned(6000000).
>>>
>>>  Do you have any suggestion?
>>>
>>>  Thanks.
>>>
>>> ------------------------------
>>>
>>> Este mensaje se dirige exclusivamente a su destinatario. Puede consultar
>>> nuestra política de envío y recepción de correo electrónico en el enlace
>>> situado más abajo.
>>> This message is intended exclusively for its addressee. We only send and
>>> receive email on the basis of the terms set out at:
>>> http://www.tid.es/ES/PAGINAS/disclaimer.aspx
>>>
>>
>>
>> ------------------------------
>>
>> Este mensaje se dirige exclusivamente a su destinatario. Puede consultar
>> nuestra política de envío y recepción de correo electrónico en el enlace
>> situado más abajo.
>> This message is intended exclusively for its addressee. We only send and
>> receive email on the basis of the terms set out at:
>> http://www.tid.es/ES/PAGINAS/disclaimer.aspx
>>
>
>
> ------------------------------
>
> Este mensaje se dirige exclusivamente a su destinatario. Puede consultar
> nuestra política de envío y recepción de correo electrónico en el enlace
> situado más abajo.
> This message is intended exclusively for its addressee. We only send and
> receive email on the basis of the terms set out at:
> http://www.tid.es/ES/PAGINAS/disclaimer.aspx
>

Mime
View raw message