hadoop-hdfs-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ranjini Rathinam <ranjinibe...@gmail.com>
Subject XmlInputFormat Hadoop -Mapreduce
Date Tue, 17 Dec 2013 12:12:42 GMT
 Hi,

I have attached the code. Please verify.

Please suggest . I am using hadoop 0.20 version.


import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
//import org.apache.hadoop.mapreduce.lib.input.XmlInputFormat;

public class ParserDriverMain {

public static void main(String[] args) {
try {
runJob(args[0], args[1]);

} catch (IOException ex) {
Logger.getLogger(ParserDriverMain.class.getName()).log(Level.SEVERE, null,
ex);
}

}

//The code is mostly self explanatory. You need to define the starting and
ending tag of to split a record from the xml file and it can be defined in
the following lines

//conf.set("xmlinput.start", "<startingTag>");
//conf.set("xmlinput.end", "</endingTag>");


public static void runJob(String input,String output ) throws IOException {

Configuration conf = new Configuration();

conf.set("xmlinput.start", "<Employee>");
conf.set("xmlinput.end", "</Employee>");
conf.set("io.serializations","org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");

Job job = new Job(conf, "jobName");

input="/user/hduser/Ran/";
output="/user/task/Sales/";
FileInputFormat.setInputPaths(job, input);
job.setJarByClass(ParserDriverMain.class);
job.setMapperClass(MyParserMapper.class);
job.setNumReduceTasks(1);
job.setInputFormatClass(XmlInputFormatNew.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
Path outPath = new Path(output);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem dfs = FileSystem.get(outPath.toUri(), conf);
if (dfs.exists(outPath)) {
dfs.delete(outPath, true);
}


try {

job.waitForCompletion(true);

} catch (InterruptedException ex) {
Logger.getLogger(ParserDriverMain.class.getName()).log(Level.SEVERE, null,
ex);
} catch (ClassNotFoundException ex) {
Logger.getLogger(ParserDriverMain.class.getName()).log(Level.SEVERE, null,
ex);
}

}

}





import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.jdom.Document;
import org.jdom.Element;
import org.jdom.JDOMException;
import org.jdom.input.SAXBuilder;
import java.io.Reader;
import java.io.StringReader;

/**
 *
 * @author root
 */
public class MyParserMapper extends Mapper<LongWritable, Text,
NullWritable, Text> {

    @Override
    public void map(LongWritable key, Text value1,Context context)throws
IOException, InterruptedException {

                    String xmlString = value1.toString();
             System.out.println("xmlString===="+xmlString);
                     SAXBuilder builder = new SAXBuilder();
                    Reader in = new StringReader(xmlString);
                String value="";
                    try {

                        Document doc = builder.build(in);
                        Element root = doc.getRootElement();

                        //String tag1
=root.getChild("tag").getChild("tag1").getTextTrim() ;

                       // String tag2
=root.getChild("tag").getChild("tag1").getChild("tag2").getTextTrim();
                         value=
root.getChild("id").getChild("ename").getChild("dept").getChild("sal").getChild("location").getTextTrim();
                             context.write(NullWritable.get(), new
Text(value));
                            } catch (JDOMException ex) {

Logger.getLogger(MyParserMapper.class.getName()).log(Level.SEVERE, null,
ex);
                       } catch (IOException ex) {

Logger.getLogger(MyParserMapper.class.getName()).log(Level.SEVERE, null,
ex);
                    }

                }

            }






import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**
* Reads records that are delimited by a specifc begin/end tag.
*/
public class XmlInputFormatNew extends  TextInputFormat {

  public static final String START_TAG_KEY = "<Employee>";
  public static final String END_TAG_KEY = "</Employee>";

    @Override
    public RecordReader<LongWritable,Text> createRecordReader(InputSplit
is, TaskAttemptContext tac)  {

        return new XmlRecordReader();

    }
  public static class XmlRecordReader extends
RecordReader<LongWritable,Text> {
    private  byte[] startTag;
    private  byte[] endTag;
    private  long start;
    private  long end;
    private  FSDataInputStream fsin;
    private  DataOutputBuffer buffer = new DataOutputBuffer();
    private LongWritable key = new LongWritable();
    private Text value = new Text();

           @Override
        public void initialize(InputSplit is, TaskAttemptContext tac)
throws IOException, InterruptedException {
            FileSplit fileSplit= (FileSplit) is;
            startTag =
tac.getConfiguration().get(START_TAG_KEY).getBytes("utf-8");
            endTag =
tac.getConfiguration().get(END_TAG_KEY).getBytes("utf-8");


                start = fileSplit.getStart();
                end = start + fileSplit.getLength();
                Path file = fileSplit.getPath();

                FileSystem fs = file.getFileSystem(tac.getConfiguration());
                fsin = fs.open(fileSplit.getPath());
                fsin.seek(start);


        }

        @Override
        public boolean nextKeyValue() throws IOException,
InterruptedException {
             if (fsin.getPos() < end) {
        if (readUntilMatch(startTag, false)) {
          try {
            buffer.write(startTag);
            if (readUntilMatch(endTag, true)) {

            value.set(buffer.getData(), 0, buffer.getLength());
            key.set(fsin.getPos());
                   return true;
            }
          } finally {
            buffer.reset();
          }
        }
      }
      return false;
        }

        @Override
        public LongWritable getCurrentKey() throws IOException,
InterruptedException {
        return key;
        }

        @Override
        public Text getCurrentValue() throws IOException,
InterruptedException {
                   return value;



        }

        @Override
        public float getProgress() throws IOException, InterruptedException
{
            return (fsin.getPos() - start) / (float) (end - start);
        }

        @Override
        public void close() throws IOException {
            fsin.close();
        }
        private boolean readUntilMatch(byte[] match, boolean withinBlock)
throws IOException {
      int i = 0;
      while (true) {
        int b = fsin.read();
        // end of file:
        if (b == -1) return false;
        // save to buffer:
        if (withinBlock) buffer.write(b);

        // check if we're matching:
        if (b == match[i]) {
          i++;
          if (i >= match.length) return true;
        } else i = 0;
        // see if we've passed the stop point:
        if (!withinBlock && i == 0 && fsin.getPos() >= end) return false;
      }
    }

  }


}
then also following error has occured , please help.

hduser@localhost:~$ hadoop jar xml.jar ParserDriverMain Ran Sales
13/12/17 15:02:01 WARN mapred.JobClient: Use GenericOptionsParser for
parsing the arguments. Applications should implement Tool for the same.
13/12/17 15:02:01 INFO input.FileInputFormat: Total input paths to process
: 1
13/12/17 15:02:01 INFO mapred.JobClient: Running job: job_201312161706_0021
13/12/17 15:02:02 INFO mapred.JobClient:  map 0% reduce 0%
13/12/17 15:02:12 INFO mapred.JobClient: Task Id :
attempt_201312161706_0021_m_000000_0, Status : FAILED
Error: Found class org.apache.hadoop.mapreduce.TaskAttemptContext, but
interface was expected
13/12/17 15:02:18 INFO mapred.JobClient: Task Id :
attempt_201312161706_0021_m_000000_1, Status : FAILED
Error: Found class org.apache.hadoop.mapreduce.TaskAttemptContext, but
interface was expected
13/12/17 15:02:24 INFO mapred.JobClient: Task Id :
attempt_201312161706_0021_m_000000_2, Status : FAILED
Error: Found class org.apache.hadoop.mapreduce.TaskAttemptContext, but
interface was expected
13/12/17 15:02:33 INFO mapred.JobClient: Job complete: job_201312161706_0021
13/12/17 15:02:33 INFO mapred.JobClient: Counters: 3
13/12/17 15:02:33 INFO mapred.JobClient:   Job Counters
13/12/17 15:02:33 INFO mapred.JobClient:     Launched map tasks=4
13/12/17 15:02:33 INFO mapred.JobClient:     Data-local map tasks=4
13/12/17 15:02:33 INFO mapred.JobClient:     Failed map tasks=1
hduser@localhost:~$









Regards
Ranjini R

 On Tue, Dec 17, 2013 at 3:20 PM, unmesha sreeveni <unmeshabiju@gmail.com>wrote:

>  Mine is working properly .
> Output
>  *13/12/17 15:18:12 WARN util.NativeCodeLoader: Unable to load
> native-hadoop library for your platform... using builtin-java classes where
> applicable*
> *13/12/17 15:18:13 WARN conf.Configuration: session.id
> <http://session.id/> is deprecated. Instead, use dfs.metrics.session-id*
> *13/12/17 15:18:13 INFO jvm.JvmMetrics: Initializing JVM Metrics with
> processName=JobTracker, sessionId=*
> *13/12/17 15:18:13 WARN mapred.JobClient: Use GenericOptionsParser for
> parsing the arguments. Applications should implement Tool for the same.*
> *13/12/17 15:18:13 WARN mapred.JobClient: No job jar file set.  User
> classes may not be found. See JobConf(Class) or JobConf#setJar(String).*
> *13/12/17 15:18:13 INFO input.FileInputFormat: Total input paths to
> process : 1*
> *13/12/17 15:18:13 INFO mapred.LocalJobRunner: OutputCommitter set in
> config null*
> *13/12/17 15:18:13 INFO mapred.JobClient: Running job:
> job_local2063093851_0001*
> *13/12/17 15:18:13 INFO mapred.LocalJobRunner: OutputCommitter is
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter*
> *13/12/17 15:18:13 INFO mapred.LocalJobRunner: Waiting for map tasks*
> *13/12/17 15:18:13 INFO mapred.LocalJobRunner: Starting task:
> attempt_local2063093851_0001_m_000000_0*
> *13/12/17 15:18:13 WARN mapreduce.Counters: Group
> org.apache.hadoop.mapred.Task$Counter is deprecated. Use
> org.apache.hadoop.mapreduce.TaskCounter instead*
> *13/12/17 15:18:13 INFO util.ProcessTree: setsid exited with exit code 0*
> *13/12/17 15:18:13 INFO mapred.Task:  Using ResourceCalculatorPlugin :
> org.apache.hadoop.util.LinuxResourceCalculatorPlugin@109c4289*
> *13/12/17 15:18:13 INFO mapred.MapTask: Processing split:
> file:/home/sreeveni/myfiles/xml/conf:0+217*
> *13/12/17 15:18:13 INFO mapred.MapTask: Map output collector class =
> org.apache.hadoop.mapred.MapTask$MapOutputBuffer*
> *13/12/17 15:18:13 INFO mapred.MapTask: io.sort.mb = 100*
> *13/12/17 15:18:13 INFO mapred.MapTask: data buffer = 79691776/99614720*
> *13/12/17 15:18:13 INFO mapred.MapTask: record buffer = 262144/327680*
> *‘<property>*
> *            <name>dfs.replication</name>*
> *            <value>1</value>*
> *     </property>‘*
> *‘<property>*
> *        <name>dfs</name>*
> *        <value>2</value>*
> *    </property>‘*
> *13/12/17 15:18:13 INFO mapred.LocalJobRunner: *
> *13/12/17 15:18:13 INFO mapred.MapTask: Starting flush of map output*
> *13/12/17 15:18:13 INFO mapred.MapTask: Finished spill 0*
> *13/12/17 15:18:13 INFO mapred.Task:
> Task:attempt_local2063093851_0001_m_000000_0 is done. And is in the process
> of commiting*
> *13/12/17 15:18:13 INFO mapred.LocalJobRunner: *
> *13/12/17 15:18:13 INFO mapred.Task: Task
> 'attempt_local2063093851_0001_m_000000_0' done.*
> *13/12/17 15:18:13 INFO mapred.LocalJobRunner: Finishing task:
> attempt_local2063093851_0001_m_000000_0*
> *13/12/17 15:18:13 INFO mapred.LocalJobRunner: Map task executor complete.*
> *13/12/17 15:18:13 WARN mapreduce.Counters: Group
> org.apache.hadoop.mapred.Task$Counter is deprecated. Use
> org.apache.hadoop.mapreduce.TaskCounter instead*
> *13/12/17 15:18:13 INFO mapred.Task:  Using ResourceCalculatorPlugin :
> org.apache.hadoop.util.LinuxResourceCalculatorPlugin@1bf54903*
> *13/12/17 15:18:13 INFO mapred.LocalJobRunner: *
> *13/12/17 15:18:13 INFO mapred.Merger: Merging 1 sorted segments*
> *13/12/17 15:18:13 INFO mapred.Merger: Down to the last merge-pass, with 1
> segments left of total size: 30 bytes*
> *13/12/17 15:18:13 INFO mapred.LocalJobRunner: *
> *13/12/17 15:18:13 INFO mapred.Task:
> Task:attempt_local2063093851_0001_r_000000_0 is done. And is in the process
> of commiting*
> *13/12/17 15:18:13 INFO mapred.LocalJobRunner: *
> *13/12/17 15:18:13 INFO mapred.Task: Task
> attempt_local2063093851_0001_r_000000_0 is allowed to commit now*
> *13/12/17 15:18:13 INFO output.FileOutputCommitter: Saved output of task
> 'attempt_local2063093851_0001_r_000000_0' to /home/sreeveni/myfiles/xmlOut*
> *13/12/17 15:18:13 INFO mapred.LocalJobRunner: reduce > reduce*
> *13/12/17 15:18:13 INFO mapred.Task: Task
> 'attempt_local2063093851_0001_r_000000_0' done.*
> *13/12/17 15:18:14 INFO mapred.JobClient:  map 100% reduce 100%*
> *13/12/17 15:18:14 INFO mapred.JobClient: Job complete:
> job_local2063093851_0001*
> *13/12/17 15:18:14 INFO mapred.JobClient: Counters: 20*
> *13/12/17 15:18:14 INFO mapred.JobClient:   File System Counters*
> *13/12/17 15:18:14 INFO mapred.JobClient:     FILE: Number of bytes
> read=780*
> *13/12/17 15:18:14 INFO mapred.JobClient:     FILE: Number of bytes
> written=185261*
> *13/12/17 15:18:14 INFO mapred.JobClient:     FILE: Number of read
> operations=0*
> *13/12/17 15:18:14 INFO mapred.JobClient:     FILE: Number of large read
> operations=0*
> *13/12/17 15:18:14 INFO mapred.JobClient:     FILE: Number of write
> operations=0*
> *13/12/17 15:18:14 INFO mapred.JobClient:   Map-Reduce Framework*
> *13/12/17 15:18:14 INFO mapred.JobClient:     Map input records=2*
> *13/12/17 15:18:14 INFO mapred.JobClient:     Map output records=2*
> *13/12/17 15:18:14 INFO mapred.JobClient:     Map output bytes=24*
> *13/12/17 15:18:14 INFO mapred.JobClient:     Input split bytes=101*
> *13/12/17 15:18:14 INFO mapred.JobClient:     Combine input records=0*
> *13/12/17 15:18:14 INFO mapred.JobClient:     Combine output records=0*
> *13/12/17 15:18:14 INFO mapred.JobClient:     Reduce input groups=2*
> *13/12/17 15:18:14 INFO mapred.JobClient:     Reduce shuffle bytes=0*
> *13/12/17 15:18:14 INFO mapred.JobClient:     Reduce input records=2*
> *13/12/17 15:18:14 INFO mapred.JobClient:     Reduce output records=4*
> *13/12/17 15:18:14 INFO mapred.JobClient:     Spilled Records=4*
> *13/12/17 15:18:14 INFO mapred.JobClient:     CPU time spent (ms)=0*
> *13/12/17 15:18:14 INFO mapred.JobClient:     Physical memory (bytes)
> snapshot=0*
> *13/12/17 15:18:14 INFO mapred.JobClient:     Virtual memory (bytes)
> snapshot=0*
> *13/12/17 15:18:14 INFO mapred.JobClient:     Total committed heap usage
> (bytes)=446431232*
>
> Can u list ur jar files?
>
>
> On Tue, Dec 17, 2013 at 3:11 PM, unmesha sreeveni <unmeshabiju@gmail.com>wrote:
>
>>  wait let me check out :)
>>
>>
>> On Tue, Dec 17, 2013 at 3:09 PM, Ranjini Rathinam <ranjinibecse@gmail.com
>> > wrote:
>>
>>> Hi,
>>>
>>> I am trying to process xml via mapreduce. and output should be in text
>>> format.
>>>
>>> I am using hadoop 0.20
>>>
>>> the following error has occured , the link provided
>>>
>>> https://github.com/studhadoop/xmlparsing-hadoop/blob/master/XmlParser11.java
>>>
>>>
>>> I have used the Package org.apache.hadoop.mapreduce.lib. only.
>>>
>>> then also following error has occured , please help.
>>>
>>> hduser@localhost:~$ hadoop jar xml.jar ParserDriverMain Ran Sales
>>> 13/12/17 15:02:01 WARN mapred.JobClient: Use GenericOptionsParser for
>>> parsing the arguments. Applications should implement Tool for the same.
>>> 13/12/17 15:02:01 INFO input.FileInputFormat: Total input paths to
>>> process : 1
>>> 13/12/17 15:02:01 INFO mapred.JobClient: Running job:
>>> job_201312161706_0021
>>> 13/12/17 15:02:02 INFO mapred.JobClient:  map 0% reduce 0%
>>> 13/12/17 15:02:12 INFO mapred.JobClient: Task Id :
>>> attempt_201312161706_0021_m_000000_0, Status : FAILED
>>> Error: Found class org.apache.hadoop.mapreduce.TaskAttemptContext, but
>>> interface was expected
>>> 13/12/17 15:02:18 INFO mapred.JobClient: Task Id :
>>> attempt_201312161706_0021_m_000000_1, Status : FAILED
>>> Error: Found class org.apache.hadoop.mapreduce.TaskAttemptContext, but
>>> interface was expected
>>> 13/12/17 15:02:24 INFO mapred.JobClient: Task Id :
>>> attempt_201312161706_0021_m_000000_2, Status : FAILED
>>> Error: Found class org.apache.hadoop.mapreduce.TaskAttemptContext, but
>>> interface was expected
>>> 13/12/17 15:02:33 INFO mapred.JobClient: Job complete:
>>> job_201312161706_0021
>>> 13/12/17 15:02:33 INFO mapred.JobClient: Counters: 3
>>> 13/12/17 15:02:33 INFO mapred.JobClient:   Job Counters
>>> 13/12/17 15:02:33 INFO mapred.JobClient:     Launched map tasks=4
>>> 13/12/17 15:02:33 INFO mapred.JobClient:     Data-local map tasks=4
>>> 13/12/17 15:02:33 INFO mapred.JobClient:     Failed map tasks=1
>>> hduser@localhost:~$
>>>
>>>
>>>
>>>
>>>
>>> thanks in advance.
>>>
>>> Ranjini
>>>
>>>
>>
>>
>>
>> --
>> *Thanks & Regards*
>>
>>  Unmesha Sreeveni U.B
>>
>> *Junior Developer*
>>
>>
>>
>
>
> --
> *Thanks & Regards*
>
>  Unmesha Sreeveni U.B
>
> *Junior Developer*
>
>
>

Mime
View raw message