Return-Path: Delivered-To: apmail-hadoop-mapreduce-user-archive@minotaur.apache.org Received: (qmail 49400 invoked from network); 8 Oct 2010 12:42:21 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 8 Oct 2010 12:42:21 -0000 Received: (qmail 1620 invoked by uid 500); 8 Oct 2010 12:42:21 -0000 Delivered-To: apmail-hadoop-mapreduce-user-archive@hadoop.apache.org Received: (qmail 1506 invoked by uid 500); 8 Oct 2010 12:42:20 -0000 Mailing-List: contact mapreduce-user-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-user@hadoop.apache.org Delivered-To: mailing list mapreduce-user@hadoop.apache.org Received: (qmail 1498 invoked by uid 99); 8 Oct 2010 12:42:19 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 Oct 2010 12:42:19 +0000 X-ASF-Spam-Status: No, hits=-0.0 required=10.0 tests=SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: local policy) Received: from [134.34.240.46] (HELO pyrimidin.rz.uni-konstanz.de) (134.34.240.46) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 Oct 2010 12:42:11 +0000 X-IronPort-AV: E=Sophos;i="4.57,302,1283731200"; d="java'?scan'208";a="12758372" Received: from kousseri.rz.uni-konstanz.de ([134.34.240.55]) by unitis.rz.uni-konstanz.de with ESMTP; 08 Oct 2010 12:41:50 +0000 Received: from [192.168.1.80] (pD9E96AB0.dip.t-dialin.net [217.233.106.176]) by kousseri.rz.uni-konstanz.de (Postfix) with ESMTP id 25DC21620047 for ; Fri, 8 Oct 2010 14:41:48 +0200 (CEST) Message-ID: <4CAF1188.9090407@uni-konstanz.de> Date: Fri, 08 Oct 2010 14:41:44 +0200 From: "Johannes.Lichtenberger" User-Agent: Mozilla/5.0 (X11; U; Linux x86_64; en-US; rv:1.9.1.12) Gecko/20100915 Thunderbird/3.0.8 MIME-Version: 1.0 To: mapreduce-user@hadoop.apache.org Subject: Re: NullPointerException References: <4CAF0820.60104@uni-konstanz.de> In-Reply-To: <4CAF0820.60104@uni-konstanz.de> X-Enigmail-Version: 1.0.1 Content-Type: multipart/mixed; boundary="------------010709040106050409060306" This is a multi-part message in MIME format. --------------010709040106050409060306 Content-Type: text/plain; charset=ISO-8859-1 Content-Transfer-Encoding: 7bit And attached my XMLRecordReader. regards, Johannes --------------010709040106050409060306 Content-Type: text/x-java; name="XMLRecordReader.java" Content-Transfer-Encoding: 7bit Content-Disposition: attachment; filename="XMLRecordReader.java" /** * Copyright (c) 2010, Distributed Systems Group, University of Konstanz * * Permission to use, copy, modify, and/or distribute this software for any * purpose with or without fee is hereby granted, provided that the above * copyright notice and this permission notice appear in all copies. * * THE SOFTWARE IS PROVIDED AS IS AND THE AUTHOR DISCLAIMS ALL WARRANTIES * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. * */ package com.treetank.wikipedia.hadoop; import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; import java.io.StringWriter; import java.io.Writer; import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Locale; import javax.xml.namespace.QName; import javax.xml.stream.EventFilter; import javax.xml.stream.FactoryConfigurationError; import javax.xml.stream.XMLEventFactory; import javax.xml.stream.XMLEventReader; import javax.xml.stream.XMLEventWriter; import javax.xml.stream.XMLInputFactory; import javax.xml.stream.XMLOutputFactory; import javax.xml.stream.XMLStreamException; import javax.xml.stream.events.StartElement; import javax.xml.stream.events.XMLEvent; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.slf4j.LoggerFactory; import com.treetank.utils.LogWrapper; /** *

XMLRecordReader

* *

* Read an XML record. *

* * @author Johannes Lichtenberger, University of Konstanz * */ public final class XMLRecordReader extends RecordReader { /** * Log wrapper {@link LogWrapper}. */ private static final LogWrapper LOGWRAPPER = new LogWrapper(LoggerFactory.getLogger(XMLRecordReader.class)); /** Start of record. */ private transient long mStart; /** End of record. */ private transient long mEnd; /** Key is a date (timestamp) {@link Date}. */ private transient DateWritable mKey; /** File input stream {@link FSDataInputStream}. */ private transient FSDataInputStream mFileIn; /** Start of record {@link EventFilter}. */ private transient EventFilter mBeginFilter; /** End of record {@link EventFilter}. */ private transient EventFilter mEndFilter; /** StAX parser {@link XMLEventReader}. */ private transient XMLEventReader mReader; /** {@link QName} (Date) which is the key. */ private transient QName mDate; /** Value is a list of XML events. */ private transient Text mValue; /** Record element identifier {@see StartElement}. */ private transient StartElement mRecordElem; /** {@link Writer} to hold value data. */ private transient XMLEventWriter mEventWriter; private transient Writer mWriter; /** Counter to track process. */ private transient int mCountEvents; /** * Constructor. */ public XMLRecordReader() { // Default constructor. } @Override public void initialize(final InputSplit paramGenericSplit, final TaskAttemptContext paramContext) throws IOException { final FileSplit split = (FileSplit)paramGenericSplit; final Configuration conf = paramContext.getConfiguration(); mStart = split.getStart(); mEnd = mStart + split.getLength(); mValue = new Text(); mWriter = new StringWriter(); try { mEventWriter = XMLOutputFactory.newInstance().createXMLEventWriter(mWriter); } catch (final XMLStreamException e) { LOGWRAPPER.error(e.getMessage(), e); } catch (final FactoryConfigurationError e) { LOGWRAPPER.error(e.getMessage(), e); } mCountEvents = 0; final Path file = split.getPath(); // Open the file and seek to the start of the split. final FileSystem fileSys = file.getFileSystem(conf); final FSDataInputStream fileIn = fileSys.open(split.getPath()); fileIn.seek(mStart); final CompressionCodecFactory comprCodecs = new CompressionCodecFactory(conf); final CompressionCodec codec = comprCodecs.getCodec(file); InputStream input = fileIn; if (codec != null) { input = codec.createInputStream(fileIn); mEnd = Long.MAX_VALUE; } input = new BufferedInputStream(input); final XMLInputFactory xmlif = XMLInputFactory.newInstance(); try { mReader = xmlif.createXMLEventReader(input); } catch (final XMLStreamException e) { LOGWRAPPER.error(e.getMessage(), e); } // Create start/end record filters. final String recordIdentifier = conf.get("record_element_name"); final String recordNsPrefix = conf.get("namespace_prefix"); final String recordNsURI = conf.get("namespace_URI"); if (recordIdentifier == null) { throw new IllegalStateException("Record identifier must be specified (record_elem_name)!"); } if (recordNsPrefix == null && recordNsURI == null) { mRecordElem = XMLEventFactory.newFactory().createStartElement(new QName(recordIdentifier), null, null); } else { mRecordElem = XMLEventFactory.newFactory().createStartElement( new QName(recordNsURI, recordIdentifier, recordNsPrefix), null, null); } mBeginFilter = new EventFilter() { @Override public boolean accept(final XMLEvent paramEvent) { return paramEvent.isStartElement() && paramEvent.asStartElement().equals(mRecordElem); } }; mEndFilter = new EventFilter() { @Override public boolean accept(final XMLEvent paramEvent) { return paramEvent.isEndElement() && paramEvent.asEndElement().getName().equals(mRecordElem.getName()); } }; mDate = new QName(conf.get("timestamp")); } @Override public DateWritable getCurrentKey() { return mKey; } @Override public Text getCurrentValue() { return mValue; } @Override public float getProgress() { return mCountEvents; } @Override public synchronized void close() throws IOException { try { mReader.close(); } catch (final XMLStreamException e) { LOGWRAPPER.error(e.getMessage(), e); } } @Override public boolean nextKeyValue() throws IOException, InterruptedException { mValue.clear(); boolean retVal = false; try { // Moves to start of record. final boolean foundStartEvent = moveToEvent(mReader, mBeginFilter, false); if (foundStartEvent) { final boolean foundEndEvent = moveToEvent(mReader, mEndFilter, true); if (foundEndEvent) { // Add last element to the writer. mReader.nextEvent().writeAsEncodedUnicode(mWriter); retVal = true; mWriter.flush(); mValue.set(mWriter.toString()); } } } catch (final XMLStreamException e) { LOGWRAPPER.error(e.getMessage(), e); } return retVal; } /** * Move to beginning of record. * * @param paramReader * XML Reader {@link XMLEventReader}. * @param paramFilter * XML filter {@link EventFilter}. * @param paramIsRecord * Determines if the parser is inside a record or outside. * @return false if event was not found and received end of file * @throws XMLStreamException * In case any parsing error occurs. */ private boolean moveToEvent(final XMLEventReader paramReader, final EventFilter paramFilter, final boolean paramIsRecord) throws XMLStreamException { boolean isTimestamp = false; final DateFormat formatter = new SimpleDateFormat("yyyy.MM.dd HH.mm.ss", Locale.ENGLISH); while (paramReader.hasNext() && !paramFilter.accept(paramReader.peek())) { final XMLEvent event = paramReader.nextEvent(); mCountEvents++; if (isTimestamp && event.isCharacters() && !event.asCharacters().isWhiteSpace()) { isTimestamp = false; try { // Parse timestamp. final String text = event.asCharacters().getData(); final String[] splitted = text.split("T"); final String time = splitted[1].substring(0, splitted[1].length()-1); mKey.setTimestamp(formatter.parse(splitted[0] + " " + time)); } catch (final ParseException e) { LOGWRAPPER.warn(e.getMessage(), e); } } if (paramIsRecord) { // Parser currently is located somewhere after the start of a record (inside a record). mEventWriter.add(event); if (event.isStartElement() && mDate.equals(event.asStartElement().getName())) { isTimestamp = true; } } } return paramReader.hasNext(); } } --------------010709040106050409060306--