Return-Path: X-Original-To: apmail-hadoop-mapreduce-user-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1036110562 for ; Fri, 22 Nov 2013 11:34:26 +0000 (UTC) Received: (qmail 90522 invoked by uid 500); 22 Nov 2013 11:34:14 -0000 Delivered-To: apmail-hadoop-mapreduce-user-archive@hadoop.apache.org Received: (qmail 90440 invoked by uid 500); 22 Nov 2013 11:34:12 -0000 Mailing-List: contact user-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@hadoop.apache.org Delivered-To: mailing list user@hadoop.apache.org Received: (qmail 90430 invoked by uid 99); 22 Nov 2013 11:34:10 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 22 Nov 2013 11:34:10 +0000 X-ASF-Spam-Status: No, hits=-0.1 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_MED,SPF_HELO_PASS,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of zoraida@tid.es designates 195.235.93.200 as permitted sender) Received: from [195.235.93.200] (HELO correo-bck.tid.es) (195.235.93.200) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 22 Nov 2013 11:34:03 +0000 Received: from sbrightmailg02.hi.inet (Sbrightmailg02.hi.inet [10.95.78.105]) by tid.hi.inet (iPlanet Messaging Server 5.2 HotFix 2.14 (built Aug 8 2006)) with ESMTP id <0MWN00L9ZXG6DG@tid.hi.inet> for user@hadoop.apache.org; Fri, 22 Nov 2013 12:33:43 +0100 (MET) Received: from vanvan (vanvan.hi.inet [10.95.78.49]) by sbrightmailg02.hi.inet (Symantec Messaging Gateway) with SMTP id 5E.07.28420.6114F825; Fri, 22 Nov 2013 12:33:42 +0100 (CET) Received: from correo.tid.es (mailhost.hi.inet [10.95.64.100]) by tid.hi.inet (iPlanet Messaging Server 5.2 HotFix 2.14 (built Aug 8 2006)) with ESMTPS id <0MWN00L9JXG5DG@tid.hi.inet> for user@hadoop.apache.org; Fri, 22 Nov 2013 12:33:41 +0100 (MET) Received: from EX10-MB2-MAD.hi.inet ([169.254.2.204]) by EX10-HTCAS7-MAD.hi.inet ([::1]) with mapi id 14.03.0158.001; Fri, 22 Nov 2013 12:33:41 +0100 Date: Fri, 22 Nov 2013 11:33:40 +0000 From: ZORAIDA HIDALGO SANCHEZ Subject: Re: Missing records from HDFS In-reply-to: X-Originating-IP: [10.95.64.115] To: "user@hadoop.apache.org" Message-id: MIME-version: 1.0 Content-type: multipart/alternative; boundary="Boundary_(ID_4ZpH6PowtawoNd+T/ba7Rw)" Content-language: es-ES Accept-Language: es-ES, en-US Thread-topic: Missing records from HDFS Thread-index: AQHO5g0WxThxnAl3aUG4c3xC9ACOC5ovKeWAgAG7mYCAADcBgP//8BOAgAAUswA= X-AuditID: 0a5f4e69-b7fe58e000006f04-da-528f4116c6fa X-MS-Has-Attach: X-MS-TNEF-Correlator: X-Brightmail-Tracker: H4sIAAAAAAAAA+NgFvrNLMWRmVeSWpSXmKPExsXCFe9nqCvm2B9k0DjDxKJnyjQWB0aPCV1b GAMYo7hsUlJzMstSi/TtErgy/nRnF7xewlTxY85BtgbGs18Zuxg5OSQETCTWv+thgrDFJC7c W8/WxcjFISSwjVFi8Y8+ZgingUni/N93UJmZjBL3FjxlBWlhEVCV+LRjBjOIzSagJ3Gx5QtY XFhATWLNsTNgYzkFgiWuTPgFtUJB4s+5xyxdjBwcIgKmEj1PdUHCvALaEov2zmeBsAUlfky+ B2YzC/hKLH7wgRHCFpeY82si2HhGAVmJledPg8VFBNQlHt7oZIGw/SRmvpnODLFKQGLJnvNQ tqjEy8f/WCcwisxCsmIWkhWzkKyAsPUkbkydwgZha0ssW/iaGcLWlZjx7xBUr5nE5+ebWZHV LGDkWMUoVpxUlJmeUZKbmJmTbmCkl5Gpl5mXWrKJERJhmTsYl+9UOcQowMGoxMO707IvSIg1 say4MvcQowQHs5IIb7ttf5AQb0piZVVqUX58UWlOavEhRiYOTqkGxkP5SZvc7zet2fent/BC 7KFl+uf72J5u0iiouaGU4rNc9mJs3eW4Jwf715rPzhfQP1ue1BhwyCA1b8LOW/2Z74/dysvf Z6VuvVhoxm6uauPi/JAex1v6J512bbrqs3vRyft6XK/ZlbbJXfn1ULR81gWLkhZu96Uyes4X utifGu9kUea7v9dupxJLcUaioRZzUXEiAL15+0yOAgAA X-Virus-Checked: Checked by ClamAV on apache.org --Boundary_(ID_4ZpH6PowtawoNd+T/ba7Rw) Content-type: text/plain; charset=iso-8859-1 Content-transfer-encoding: quoted-printable Sure, our FileInputFormat implementation: public class CVSInputFormat extends FileInputFormat { /* * (non-Javadoc) * * @see * org.apache.hadoop.mapreduce.InputFormat#createRecordReader(org.apach= e * .hadoop.mapreduce.InputSplit, * org.apache.hadoop.mapreduce.TaskAttemptContext) */ @Override public RecordReader createRecordReader( InputSplit split, TaskAttemptContext context) { String delimiter =3D context.getConfiguration().get( "textinputformat.record.delimiter"); byte[] recordDelimiterBytes =3D null; if (null !=3D delimiter) recordDelimiterBytes =3D delimiter.getBytes(); return new CVSLineRecordReader(recordDelimiterBytes); } /* * (non-Javadoc) * * @see * org.apache.hadoop.mapreduce.lib.input.FileInputFormat#isSplitable(or= g * .apache.hadoop.mapreduce.JobContext, org.apache.hadoop.fs.Path) */ @Override protected boolean isSplitable(JobContext context, Path file) { CompressionCodec codec =3D new CompressionCodecFactory( context.getConfiguration()).getCodec(file); return codec =3D=3D null; } } the recordReader: public class CVSLineRecordReader extends RecordReader { private static final Log LOG =3D LogFactory.getLog(CVSLineRecordReader.= class); public static final String CVS_FIRST_LINE =3D "file.first.line"; private long start; private long pos; private long end; private LineReader in; private int maxLineLength; private FileValidatorDescriptor key =3D null; private Text value =3D null; private Text data =3D null; private byte[] recordDelimiterBytes; public CVSLineRecordReader(byte[] recordDelimiter) { this.recordDelimiterBytes =3D recordDelimiter; } @Override public void initialize(InputSplit genericSplit, TaskAttemptContext cont= ext) throws IOException { Properties properties =3D new Properties(); Configuration configuration =3D context.getConfiguration(); Path[] cacheFiles =3D DistributedCache.getLocalCacheFiles(context .getConfiguration()); for (Path cacheFile : cacheFiles) { if (cacheFile.toString().endsWith( context.getConfiguration().get(VALIDATOR_CONF_PATH))) { properties.load(new FileReader(cacheFile.toString())); } } FileSplit split =3D (FileSplit) genericSplit; Configuration job =3D context.getConfiguration(); this.maxLineLength =3D job.getInt("mapred.linerecordreader.maxlengt= h", Integer.MAX_VALUE); start =3D split.getStart(); end =3D start + split.getLength(); pos =3D start; final Path file =3D split.getPath(); // open the file and seek to the start of the split FileSystem fs =3D file.getFileSystem(job); FSDataInputStream fileIn =3D fs.open(split.getPath()); this.in =3D generateReader(fileIn, job); // if CVS_FIRST_LINE does not exist in conf then the csv file first= line // is the header if (properties.containsKey(CVS_FIRST_LINE)) { configuration.set(CVS_FIRST_LINE, properties.get(CVS_FIRST_LINE= ) .toString()); } else { readData(); configuration.set(CVS_FIRST_LINE, data.toString()); if (start !=3D 0) { fileIn.seek(start); in =3D generateReader(fileIn, job); pos =3D start; } } key =3D new FileValidatorDescriptor(); key.setFileName(split.getPath().getName()); context.getConfiguration().set("file.name", key.getFileName()); } @Override public boolean nextKeyValue() throws IOException { int newSize =3D readData(); if (newSize =3D=3D 0) { key =3D null; value =3D null; return false; } else { key.setOffset(this.pos); value =3D data; return true; } } private LineReader generateReader(FSDataInputStream fileIn, Configuration job) throws IOException { if (null =3D=3D this.recordDelimiterBytes) { return new LineReader(fileIn, job); } else { return new LineReader(fileIn, job, this.recordDelimiterBytes); } } private int readData() throws IOException { if (data =3D=3D null) { data =3D new Text(); } int newSize =3D 0; while (pos < end) { newSize =3D in.readLine(data, maxLineLength, Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), maxLineLength)); if (newSize =3D=3D 0) { break; } pos +=3D newSize; if (newSize < maxLineLength) { break; } // line too long. try again LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize)); } return newSize; } @Override public FileValidatorDescriptor getCurrentKey() { return key; } @Override public Text getCurrentValue() { return value; } @Override public float getProgress() { if (start =3D=3D end) { return 0.0f; } else { return Math.min(1.0f, (pos - start) / (float) (end - start)); } } @Override public synchronized void close() throws IOException { if (in !=3D null) { in.close(); } } } Thanks. De: Azuryy Yu > Responder a: "user@hadoop.apache.org" > Fecha: viernes, 22 de noviembre de 2013 12:19 Para: "user@hadoop.apache.org" > Asunto: Re: Missing records from HDFS I do think this is because of your RecorderReader, can you paste your code = here? and give a piece of data example. please use pastebin if you want. On Fri, Nov 22, 2013 at 7:16 PM, ZORAIDA HIDALGO SANCHEZ > wrote: One more thing, if we split the files then all the records are processed. Files are of 70,5= MB. Thanks, Zoraida.- De: zoraida > Fecha: viernes, 22 de noviembre de 2013 08:59 Para: "user@hadoop.apache.org" > Asunto: Re: Missing records from HDFS Thanks for your response Azuryy. My hadoop version: 2.0.0-cdh4.3.0 InputFormat: a custom class that extends from FileInputFormat(csv input for= mat) These fiels are under the same directory, different files. My input path is configured using oozie throughout the propertie mapred.inp= ut.dir. Same code and input running on Hadoop 2.0.0-cdh4.2.1 works fine. Does not d= iscard any record. Thanks. De: Azuryy Yu > Responder a: "user@hadoop.apache.org" > Fecha: jueves, 21 de noviembre de 2013 07:31 Para: "user@hadoop.apache.org" > Asunto: Re: Missing records from HDFS what's your hadoop version? and which InputFormat are you used? these files under one directory or there are lots of subdirectory? how ddi = you configure input path in your main? On Thu, Nov 21, 2013 at 12:25 AM, ZORAIDA HIDALGO SANCHEZ > wrote: Hi all, my job is not reading all the input records. In the input directory I have = a set of files containing a total of 6000000 records but only 5997000 are p= rocessed. The Map Input Records counter says 5997000. I have tried downloading the files with a getmerge to check how many record= s would return but the correct number is returned(6000000). Do you have any suggestion? Thanks. ________________________________ Este mensaje se dirige exclusivamente a su destinatario. Puede consultar nu= estra pol=EDtica de env=EDo y recepci=F3n de correo electr=F3nico en el enl= ace situado m=E1s abajo. This message is intended exclusively for its addressee. We only send and re= ceive email on the basis of the terms set out at: http://www.tid.es/ES/PAGINAS/disclaimer.aspx ________________________________ Este mensaje se dirige exclusivamente a su destinatario. Puede consultar nu= estra pol=EDtica de env=EDo y recepci=F3n de correo electr=F3nico en el enl= ace situado m=E1s abajo. This message is intended exclusively for its addressee. We only send and re= ceive email on the basis of the terms set out at: http://www.tid.es/ES/PAGINAS/disclaimer.aspx ________________________________ Este mensaje se dirige exclusivamente a su destinatario. Puede consultar nu= estra pol=EDtica de env=EDo y recepci=F3n de correo electr=F3nico en el enl= ace situado m=E1s abajo. This message is intended exclusively for its addressee. We only send and re= ceive email on the basis of the terms set out at: http://www.tid.es/ES/PAGINAS/disclaimer.aspx --Boundary_(ID_4ZpH6PowtawoNd+T/ba7Rw) Content-id: <19A4A79283D6CB4C82817BC7EBD4439E@hi.inet> Content-type: text/html; charset=iso-8859-1 Content-transfer-encoding: quoted-printable
Sure,

our FileInputFormat implementation:

public class CVSInputFormat extends

    &= nbsp;   FileInputFormat<FileValidatorDescriptor, Text> {

    <= span style=3D"color:#4e9072"> /*

     * (non-Javado= c)

     * 

     * @see

     * org.apache.hadoop.mapreduce.InputFormat#createRecordR= eader(org.apache

     * .hadoop.mapreduce.InputSplit,

     * org.apache.hadoop.mapreduce.TaskAttemptContext)

     */

    @Override

    <= span style=3D"color:#931a68"> public RecordReader<FileValidatorDescriptor, Text> createRecor= dReader(

    &= nbsp;       InputSplit split, TaskAttemptContext context) {<= /p>

    &= nbsp;   String delimiter =3D context.getConfiguration().get(

          &nb= sp;     "textinputformat.record.delimiter");

    &= nbsp;   byte[] recordDelimiterBytes =3D null;

    &= nbsp;   if (null !=3D delimiter)

    &= nbsp;       recordDelimiterBytes =3D delimiter.getBytes();

    &= nbsp;   return new CVSLineRecordReader(= recordDelimiterBytes);

    }=

    <= span style=3D"color:#4e9072"> /*

     * (non-Javado= c)

     * 

     * @see

     * org.apache.hadoop.mapreduce.lib.input.FileInputFormat= #isSplitable(org

     * .apache.hadoop.mapreduce.JobContext, org.apache.hadoo= p.fs.Path)

     */

    @Override

    <= span style=3D"color:#931a68"> protected boolean isSplitable(J= obContext context, Path file) {

    &= nbsp;   CompressionCodec codec =3D new CompressionCodecFactory(

    &= nbsp;           context.getConfiguration()).getCod= ec(file);

    &= nbsp;   return codec =3D=3D null;

    }=

}


the recordReade= r:



public class CVSLineRecordReader extends

        RecordReader<FileVal= idatorDescriptor, Text> {

    private static final Log LOG =3D LogF= actory.getLog(CVSLineRecordReader.class);


    public static final String CVS_FIRST_= LINE =3D "file.first.line";


    private long start;

    private long pos;

    private long end;

    private LineReader in;

    private int maxLineLength;

    private FileValidatorDescriptor key = =3D null;

    private Text value =3D null;

    private Text data =3D null;

    private byte[] recordDelimiterBytes;<= /p>


    public CVSLineRecordReader(byte[] rec= ordDelimiter) {

        this.recordDelimiterByt= es =3D recordDelimiter;

    }


    @Override

    public void initialize(InputSplit gen= ericSplit, TaskAttemptContext context)

            throws IO= Exception {

        Properties properties = =3D new Properties();

        Configuration configura= tion =3D context.getConfiguration();


        Path[] cacheFiles =3D D= istributedCache.getLocalCacheFiles(context

              &n= bsp; .getConfiguration());

        for (Path cacheFile : c= acheFiles) {

            if (cache= File.toString().endsWith(

              &n= bsp;     context.getConfiguration().get(VALIDATOR_CONF_PATH))) {<= /p>

              &n= bsp; properties.load(new FileReader(cacheFile.toString()));

            }

        }


        FileSplit split =3D (Fi= leSplit) genericSplit;

        Configuration job =3D c= ontext.getConfiguration();

        this.maxLineLength =3D = job.getInt("mapred.linerecordreader.maxlength",

              &n= bsp; Integer.MAX_VALUE);

        start =3D split.getStar= t();

        end =3D start + spl= it.getLength();

        pos =3D start;

        final Path file =3D spl= it.getPath();


        // open the file and se= ek to the start of the split

        FileSystem fs =3D file.= getFileSystem(job);

        FSDataInputStream fileI= n =3D fs.open(split.getPath());


        this.in =3D generateRea= der(fileIn, job);


        // if CVS_FIRST_LINE do= es not exist in conf then the csv file first line

        // is the header

        if (properties.contains= Key(CVS_FIRST_LINE)) {

            configura= tion.set(CVS_FIRST_LINE, properties.get(CVS_FIRST_LINE)

              &n= bsp;     .toString());

        } else {

            readData(= );

            configura= tion.set(CVS_FIRST_LINE, data.toString());

            if (start= !=3D 0) {

              &n= bsp; fileIn.seek(start);

              &n= bsp; in =3D generateReader(fileIn, job);

              &n= bsp; pos =3D start;

            }

        }


        key =3D new FileValidat= orDescriptor();

        key.setFileName(split.g= etPath().getName());

        context.getConfiguratio= n().set("file.name", key.getFileName());


    }


    @Override

    public boolean nextKeyValue() throws = IOException {

        int newSize =3D readDat= a();

        if (newSize =3D=3D 0) {=

            key =3D n= ull;

            value =3D= null;

            return fa= lse;

        } else {

            key.setOf= fset(this.pos);

            value =3D= data;

            return tr= ue;

        }

    }


    private LineReader generateReader(FSD= ataInputStream fileIn,

            Configura= tion job) throws IOException {

        if (null =3D=3D this.re= cordDelimiterBytes) {

            return ne= w LineReader(fileIn, job);

        } else {

            return ne= w LineReader(fileIn, job, this.recordDelimiterBytes);

        }


    }


    private int readData() throws IOExcep= tion {

        if (data =3D=3D null) {=

            data =3D = new Text();

        }

        int newSize =3D 0;

        while (pos < end) {<= /p>

            newSize = =3D in.readLine(data, maxLineLength,

              &n= bsp;     Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),

              &n= bsp;             maxLineLength));

            if (newSi= ze =3D=3D 0) {

              &n= bsp; break;

            }

            pos += =3D newSize;

            if (newSi= ze < maxLineLength) {

              &n= bsp; break;

            }


            // line t= oo long. try again

            LOG.info(= "Skipped line of size " + newSize + " at pos "<= /p>

              &n= bsp;     + (pos - newSize));

        }

        return newSize;

    }


    @Override

    public FileValidatorDescriptor getCur= rentKey() {

        return key;

    }


    @Override

    public Text getCurrentValue() {

        return value;

    }


    @Override

    public float getProgress() {

        if (start =3D=3D end) {=

            return 0.= 0f;

        } else {

            return Ma= th.min(1.0f, (pos - start) / (float) (end - start));

        }

    }


    @Override

    public synchronized void close() thro= ws IOException {

        if (in !=3D null) {

            in.close(= );

        }

    }

}


Thanks.


De: Azuryy Yu <azuryyyu@gmail.com>
Responder a: "user@hadoop.apache.org" <user@hadoop.apache.org>
Fecha: viernes, 22 de noviembre de = 2013 12:19
Para: "user@hadoop.apache.org" <user@hadoop.apache.org>
Asunto: Re: Missing records from HD= FS

I do think this is because of your RecorderReader, can you= paste your code here? and give a piece of data example.

please use pastebin if you want. 


On Fri, Nov 22, 2013 at 7:16 PM, ZORAIDA HIDALGO= SANCHEZ <z= oraida@tid.es> wrote:
One more thing,

if we split the files then all the records are processed. Files are of=  70,5MB.

Thanks,

Zoraida.-

De: zoraida <zoraida@tid.es>
Fecha: viernes, 22 de noviembre de = 2013 08:59

Para: "user@hadoop.apache.org" <<= a href=3D"mailto:user@hadoop.apache.org" target=3D"_blank">user@hadoop.apac= he.org>
Asunto: Re: Missing records from HD= FS

Thanks for yo= ur response Azuryy.

My hadoop ver= sion: 2.0.0-cdh4.3.0
InputFormat: = a custom class that extends from FileInputFormat(csv input format)
These fiels are under the same dire= ctory, different files.
My input path is configured using oozie thr= oughout the propertie mapred.input.dir. 


Same code and input running on = ;Hadoop 2.0.0-cdh4.2.1 works fine. Does not discard any record.

Thanks.

De: Azuryy Yu <azuryyyu@gmail.com>
Responder a: "user@hadoop.apache.org"= ; <user@hado= op.apache.org>
Fecha: jueves, 21 de noviembre de 2= 013 07:31
Para: "user@hadoop.apache.org" <<= a href=3D"mailto:user@hadoop.apache.org" target=3D"_blank">user@hadoop.apac= he.org>
Asunto: Re: Missing records from HD= FS

what's your hadoop version? and which InputFormat are you = used?

these files under one directory or there are lots of subdirectory? how= ddi you configure input path in your main?



On Thu, Nov 21, 2013 at 12:25 AM, ZORAIDA HIDALG= O SANCHEZ <z= oraida@tid.es> wrote:
Hi all,

my job is not reading all the input records. In the input directory I = have a set of files containing a total of 6000000 records but only 5997000 = are processed. The Map Input Records counter says 5997000.
I have tried downloading the files with a getmerge to check how many r= ecords would return but the correct number is returned(6000000).

Do you have any suggestion? 

Thanks. 



Este mensaje se dirige exclusivamente a su destinatario. Puede consultar nu= estra pol=EDtica de env=EDo y recepci=F3n de correo electr=F3nico en el enl= ace situado m=E1s abajo.
This message is intended exclusively for its addressee. We only send and re= ceive email on the basis of the terms set out at:
= http://www.tid.es/ES/PAGINAS/disclaimer.aspx




Este mensaje se dirige exclusivamente a su destinatario. Puede consultar nu= estra pol=EDtica de env=EDo y recepci=F3n de correo electr=F3nico en el enl= ace situado m=E1s abajo.
This message is intended exclusively for its addressee. We only send and re= ceive email on the basis of the terms set out at:
= http://www.tid.es/ES/PAGINAS/disclaimer.aspx




Este mensaje se dirige exclusivamente a su destinatario. Puede consultar nu= estra pol=EDtica de env=EDo y recepci=F3n de correo electr=F3nico en el enl= ace situado m=E1s abajo.
This message is intended exclusively for its addressee. We only send and re= ceive email on the basis of the terms set out at:
http://www.tid.es/ES/PAGINAS/disclaimer.aspx
--Boundary_(ID_4ZpH6PowtawoNd+T/ba7Rw)--