hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jeff Zhang <zjf...@gmail.com>
Subject Re: Configured & PathFilter
Date Wed, 14 Apr 2010 14:34:04 GMT
Kris

Try use /test-batchEventLog/metrics<http://hadoop-eventlog01.socialmedia.com/test-batchEventLog/metrics>
/*
Append asterisk.


On Wed, Apr 14, 2010 at 7:26 AM, Kris Nuttycombe
<kris.nuttycombe@gmail.com>wrote:

> On Wed, Apr 14, 2010 at 2:16 AM, Jeff Zhang <zjffdu@gmail.com> wrote:
> > Hi Kris,
> >
> > I am not sure the mechanism of scala language. For the following lines of
> > code:
> >
> >  var conf: Configuration = _
> >  lazy val basePath   = option(conf.get(BasePath)).
> > map(new Path(_))
> >  lazy val eventType = option(conf.get(EventTypeName)).map(EventType(_))
> >  lazy val start          = option(conf.get(StartTime)).map(t =>
> > Timestamp(t.toLong))
> >  lazy val end           = option(conf.get(EndTime)).map(t =>
> > Timestamp(t.toLong))
> >
> >
> > I guess these lines of code is extcuted when the HDFSEventLogPathFilter
> is
> > created, right ? In that case, the conf object is null at that time.
> You'd
> > better to put these code on the setConf method
>
> Actually not; lazy vals in scala have their assignment expressions
> evaluated the first time they are referenced; in this case it's in the
> logging statement of the setConf method, but ordinarily it'd be upon
> the first evaluation of the accept method of the filter. You can see
> from the log output that the variables have been correctly assigned:
>
> >> >> INF [20100413-08:52:14.308] somegra:
> >> >>
> >> >> List(Some(hdfs://
> hadoop-eventlog01.socialmedia.com/test-batchEventLog/metrics),
> >> >> Some(MetricsEvent), None, None)
>
> This log output is being generated by the code in setConf:
>
> >> >>  override def setConf(conf: Configuration): Unit = {
> >> >>    this.conf = conf
> >> >>    log.info(List(basePath, eventType, start, end).toString)
> >> >>  }
>
> I believe that the incorrect error message results from globStatus
> returning null in the case that the path filter fails to accept the
> base path.
>
> Kris
>
> >
> >
> > On Wed, Apr 14, 2010 at 1:02 AM, Kris Nuttycombe <
> kris.nuttycombe@gmail.com>
> > wrote:
> >>
> >> Ultimately, the issue was that my path filter was accepting the child
> >> paths but not the base path. But the misleading error message (which I
> >> believe is due to ill behavior of globStatus).
> >>
> >> Kris
> >>
> >> On Tue, Apr 13, 2010 at 10:37 AM, Kris Nuttycombe
> >> <kris.nuttycombe@gmail.com> wrote:
> >> > Okay, some more information mined from the source:
> >> >
> >> > This error is a red herring; it says that the input path does not
> >> > exist but instead it means that no files were selected by the path
> >> > filter. Here's the offending code:
> >> >
> >> >   for (int i=0; i < dirs.length; ++i) {
> >> >      Path p = dirs[i];
> >> >      FileSystem fs = p.getFileSystem(job.getConfiguration());
> >> >      FileStatus[] matches = fs.globStatus(p, inputFilter);
> >> >      if (matches == null) {
> >> >        errors.add(new IOException("Input path does not exist: " + p));
> >> >      } else if (matches.length == 0) {
> >> >      ...
> >> >   }
> >> >
> >> > I used this logging to determine the error:
> >> >
> >> >    log.info("Set input base path as: " + inPath.toString)
> >> >    using(FileSystem.get(conf)) {
> >> >      fs =>
> >> >      log.info((fs.exists(inPath), fs.getFileStatus(inPath).isDir,
> >> > fs.getFileStatus(inPath).getPath).toString)
> >> >      val inPaths = FileInputFormat.getInputPaths(job)
> >> >      if (inPaths == null) {
> >> >        log.error("Input paths were null!")
> >> >      } else {
> >> >        for (dir <- inPaths) {
> >> >          log.info("Searching input paths " + dir.toString)
> >> >          val pathFilter = FileInputFormat.getInputPathFilter(job)
> >> >          val status = fs.getFileStatus(dir)
> >> >          if (status == null) {
> >> >            log.error("The input path " + dir.toString + " was not
> >> > found")
> >> >          } else {
> >> >            log.info("Attempting to obtain input files from path " +
> >> > status.getPath + " (isDir = " + status.isDir + ")")
> >> >          }
> >> >          val files = fs.globStatus(dir, pathFilter)
> >> >          if (files == null) {
> >> >            log.error("No files found in input path!")
> >> >          } else {
> >> >            for (file <- files) {
> >> >              log.info("Accepted file " + file)
> >> >            }
> >> >          }
> >> >        }
> >> >      }
> >> >    }
> >> >
> >> > Output:
> >> >
> >> > INF [20100413-09:35:15.714] reporting: Set input base path as:
> >> > hdfs://hadoop-eventlog01.socialmedia.com/test-batchEventLog/metrics
> >> > INF [20100413-09:35:16.328] reporting:
> >> >
> >> > (true,true,hdfs://
> hadoop-eventlog01.socialmedia.com/test-batchEventLog/metrics)
> >> > INF [20100413-09:35:16.330] reporting: Searching input paths
> >> > hdfs://hadoop-eventlog01.socialmedia.com/test-batchEventLog/metrics
> >> > INF [20100413-09:35:16.335] somegra:
> >> >
> >> > List(Some(hdfs://
> hadoop-eventlog01.socialmedia.com/test-batchEventLog/metrics),
> >> > Some(MetricsEvent), None, None)
> >> > INF [20100413-09:35:16.566] reporting: Attempting to obtain input
> >> > files from path
> >> > hdfs://hadoop-eventlog01.socialmedia.com/test-batchEventLog/metrics
> >> > (isDir = true)
> >> > ERR [20100413-09:35:16.570] reporting: No files found in input path!
> >> > 10/04/13 09:35:19 WARN mapred.JobClient: Use GenericOptionsParser for
> >> > parsing the arguments. Applications should implement Tool for the
> >> > same.
> >> > INF [20100413-09:35:20.214] somegra:
> >> >
> >> > List(Some(hdfs://
> hadoop-eventlog01.socialmedia.com/test-batchEventLog/metrics),
> >> > Some(MetricsEvent), None, None)
> >> >
> >> > It appears that at some point, the semantics of globStatus have
> >> > changed such that it never returns the empty array, and always returns
> >> > null whether the failure is the absence of the base path or the
> >> > inability to find any files.
> >> >
> >> > Should I file a bug (with patch?)
> >> >
> >> > Kris
> >> >
> >> > On Tue, Apr 13, 2010 at 10:07 AM, Kris Nuttycombe
> >> > <kris.nuttycombe@gmail.com> wrote:
> >> >> Jeff,
> >> >>
> >> >> Thanks very much for your help; I have at least gotten beyond the NPE
> >> >> I was encountering when I was having my path filter extend Configured
> >> >> instead of Configurable (though in looking at the code, I'm sort of
> >> >> baffled... does setConf potentially get called multiple times?)
> >> >> Unfortunately, I'm still stuck. Could you please take a quick look
at
> >> >> the following code and see if there's anything obvious to you that
I
> >> >> might be doing wrong?
> >> >>
> >> >> Here is the code that I use to configure my PathFilter instance, and
> >> >> the PathFilter class itself (using Scala; I have starred the lines
> >> >> that I believe to be relevant to this question in case it appears
> >> >> dense):
> >> >>
> >> >> object HDFSEventLogPathFilter {
> >> >>  // constants for configuration keys
> >> >>  val BasePath             = "socialmedia.somegra.reporting.basePath"
> >> >>  val EventTypeName = "socialmedia.somegra.reporting.eventType"
> >> >>  val StartTime            = "socialmedia.somegra.reporting.startTime"
> >> >>  val EndTime              = "socialmedia.somegra.reporting.endTime"
> >> >>
> >> >>  def configure(job: Job, basePath: Path, start: Option[Timestamp],
> >> >> end: Option[Timestamp], et: EventType): Unit = {
> >> >>    job.getConfiguration.set(BasePath, basePath.toString)
> >> >>    job.getConfiguration.set(EventTypeName, et.keyPart)
> >> >>    start.foreach(t => job.getConfiguration.set(StartTime,
> >> >> t.epochMillis.toString))
> >> >>    end.foreach(t => job.getConfiguration.set(EndTime,
> >> >> t.epochMillis.toString))
> >> >>
> >> >> *    FileInputFormat.addInputPath(job, basePath)
> >> >> *    FileInputFormat.setInputPathFilter(job,
> >> >> classOf[HDFSEventLogPathFilter])
> >> >>  }
> >> >> }
> >> >>
> >> >> class HDFSEventLogPathFilter extends PathFilter with Configurable {
> >> >>  val log = Logger.get
> >> >>  import HDFSEventLog._
> >> >>  import HDFSEventLogPathFilter._
> >> >>
> >> >>  var conf: Configuration = _
> >> >>  lazy val basePath   = option(conf.get(BasePath)).map(new Path(_))
> >> >>  lazy val eventType =
> option(conf.get(EventTypeName)).map(EventType(_))
> >> >>  lazy val start          = option(conf.get(StartTime)).map(t =>
> >> >> Timestamp(t.toLong))
> >> >>  lazy val end           = option(conf.get(EndTime)).map(t =>
> >> >> Timestamp(t.toLong))
> >> >>
> >> >>  override def setConf(conf: Configuration): Unit = {
> >> >>    this.conf = conf
> >> >>    log.info(List(basePath, eventType, start, end).toString)
> >> >>  }
> >> >>
> >> >>  override def getConf = conf
> >> >>
> >> >>  override def accept(p: Path) = {
> >> >>    basePath.forall {
> >> >>      bp => bp.equals(p.getParent) && eventType.forall {
et =>
> >> >>        p.getName.startsWith(et.keyPart) &&
> >> >>        start.forall(t => p.getName >= (path(bp, et, t).getName))
&&
> >> >>        end.forall(t => p.getName <= (path(bp, et, t).getName))
> >> >>      }
> >> >>    }
> >> >>  }
> >> >> }
> >> >>
> >> >> Here is the calling code:
> >> >>
> >> >>    log.info("Attempting to set input base path as: " +
> inPath.toString)
> >> >>
> >> >>    using(FileSystem.get(conf)) {
> >> >>      fs => log.info((fs.exists(inPath),
> >> >> fs.getFileStatus(inPath).isDir,
> >> >> fs.getFileStatus(inPath).getPath).toString)
> >> >>    }
> >> >>
> >> >>    HDFSEventLogPathFilter.configure(job, inPath, df.flatMap(_.start),
> >> >> df.flatMap(_.end), MetricsEvent)
> >> >>
> >> >> Here is the output that I get upon running the program. The first two
> >> >> log statements are from the log.info statements just above; the
> third
> >> >> is from the PathFilter itself.
> >> >>
> >> >> INF [20100413-08:52:11.224] reporting: Attempting to set input base
> >> >> path as:
> >> >> hdfs://hadoop-eventlog01.socialmedia.com/test-batchEventLog/metrics
> >> >> INF [20100413-08:52:11.939] reporting:
> >> >>
> >> >> (true,true,hdfs://
> hadoop-eventlog01.socialmedia.com/test-batchEventLog/metrics)
> >> >> INF [20100413-08:52:14.308] somegra:
> >> >>
> >> >> List(Some(hdfs://
> hadoop-eventlog01.socialmedia.com/test-batchEventLog/metrics),
> >> >> Some(MetricsEvent), None, None)
> >> >> 10/04/13 08:52:13 WARN mapred.JobClient: Use GenericOptionsParser for
> >> >> parsing the arguments. Applications should implement Tool for the
> >> >> same.
> >> >> Exception in thread "main"
> >> >> org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input
> >> >> path does not exist:
> >> >> hdfs://hadoop-eventlog01.socialmedia.com/test-batchEventLog/metrics
> >> >>        at
> >> >>
> org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:224)
> >> >>        at
> >> >>
> org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:55)
> >> >>        at
> >> >>
> org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:241)
> >> >>        at
> >> >> org.apache.hadoop.mapred.JobClient.writeNewSplits(JobClient.java:885)
> >> >>        at
> >> >>
> org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:779)
> >> >>        at org.apache.hadoop.mapreduce.Job.submit(Job.java:432)
> >> >>        at
> >> >> org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:447)
> >> >>        at
> >> >>
> socialmedia.somegra.reporting.HDFSMapReduceQuery.execute(HDFSMetricsQuery.scala:66)
> >> >>
> >> >> I'm uncertain as to whether this error is caused by the filter that
> >> >> I'm using or something more fundamental, but I'm really struggling
to
> >> >> understand how the path which has just been determined to exist is
> not
> >> >> seen as existent by the FileInputFormat. As further evidence of
> >> >> existence, here's command line output showing the contents of this
> >> >> directory:
> >> >>
> >> >> [knuttycombe@floorshow reporting (kjn-reporting-fhadoop)]$ hadoop fs
> >> >> -ls hdfs://
> hadoop-eventlog01.socialmedia.com/test-batchEventLog/metrics
> >> >> Found 7530 items
> >> >> -rw-rw-rw-   3 jetty       hadoop    1604399 2010-04-07 13:10
> >> >> /test-batchEventLog/metrics/metrics_1270671021526
> >> >> -rw-rw-rw-   3 jetty       hadoop    1588231 2010-04-07 13:10
> >> >> /test-batchEventLog/metrics/metrics_1270671029284
> >> >> -rw-rw-rw-   3 jetty       hadoop    1590922 2010-04-07 13:10
> >> >> /test-batchEventLog/metrics/metrics_1270671030131
> >> >> -rw-rw-rw-   3 jetty       hadoop    1595381 2010-04-07 13:10
> >> >> /test-batchEventLog/metrics/metrics_1270671030416
> >> >> ...
> >> >>
> >> >>  What could I be doing wrong?
> >> >>
> >> >> Thanks again for your help,
> >> >>
> >> >> Kris
> >> >>
> >> >> On Tue, Apr 13, 2010 at 6:49 AM, Jeff Zhang <zjffdu@gmail.com>
> wrote:
> >> >>> Kris,
> >> >>>
> >> >>> Here's sample PathFilter which can been configured, The only thing
> you
> >> >>> need
> >> >>> to do is add the following line to configure the job
> >> >>>
> >> >>> job.getConfiguration().set("pathfilter.pattern", "your_patter");
> >> >>>
> >> >>> Not sure whether this is what you want.
> >> >>>
> >> >>>
> >> >>> public class MyPathFilter implements PathFilter ,Configurable{
> >> >>>
> >> >>>     private String pattern;
> >> >>>
> >> >>>     private Configuration conf;
> >> >>>
> >> >>>     public MyPathFilter(){
> >> >>>
> >> >>>     }
> >> >>>
> >> >>>     @Override
> >> >>>     public boolean accept(Path path) {
> >> >>>         if (path.getName().contains(conf.
> >> >>> get("pathfilter.pattern"))) {
> >> >>>             return true;
> >> >>>         } else {
> >> >>>             return false;
> >> >>>         }
> >> >>>     }
> >> >>>
> >> >>>     @Override
> >> >>>     public Configuration getConf() {
> >> >>>         return this.conf;
> >> >>>     }
> >> >>>
> >> >>>     @Override
> >> >>>     public void setConf(Configuration conf) {
> >> >>>         this.conf=conf;
> >> >>>     }
> >> >>> }
> >> >>>
> >> >>>
> >> >>>
> >> >>> On Mon, Apr 12, 2010 at 4:05 PM, Kris Nuttycombe
> >> >>> <kris.nuttycombe@gmail.com>
> >> >>> wrote:
> >> >>>>
> >> >>>> Whoops, so much for that idea. The Configuration instance being
> >> >>>> passed
> >> >>>> to setConf is null.
> >> >>>>
> >> >>>> I am utterly baffled. Is there seriously nobody out there using
> >> >>>> PathFilter in this way? Everyone's just using dumb PathFilter
> >> >>>> instances that don't have any configurable functionality?
> >> >>>>
> >> >>>> /me boggles.
> >> >>>>
> >> >>>> Kris
> >> >>>>
> >> >>>> On Mon, Apr 12, 2010 at 2:03 PM, Kris Nuttycombe
> >> >>>> <kris.nuttycombe@gmail.com> wrote:
> >> >>>> > I just dove into the source, and it looks like the PathFilter
> >> >>>> > instance
> >> >>>> > is instantiated using ReflectionUtils, and setConf is
called so
> if
> >> >>>> > the
> >> >>>> > resulting PathFilter instance implements Configurable,
then
> >> >>>> > configuration will be available.
> >> >>>> >
> >> >>>> > Kris
> >> >>>> >
> >> >>>> > On Mon, Apr 12, 2010 at 1:52 PM, Kris Nuttycombe
> >> >>>> > <kris.nuttycombe@gmail.com> wrote:
> >> >>>> >> static void     setInputPathFilter(Job job, Class<?
extends
> >> >>>> >> PathFilter>
> >> >>>> >> filter)
> >> >>>> >>
> >> >>>> >> This indicates that reflection will be used to instantiate
the
> >> >>>> >> required PathFilter object, and I need to be able
to access the
> >> >>>> >> minimum and maximum date for a given run. I don't
want to have
> to
> >> >>>> >> implement a separate PathFilter class for each set
of dates,
> >> >>>> >> obviously.
> >> >>>> >>
> >> >>>> >> Thanks,
> >> >>>> >>
> >> >>>> >> Kris
> >> >>>> >>
> >> >>>> >> On Mon, Apr 12, 2010 at 9:35 AM, Jeff Zhang <zjffdu@gmail.com>
> >> >>>> >> wrote:
> >> >>>> >>>  Hi Kris,
> >> >>>> >>>
> >> >>>> >>> Do you mean you want to use the PathFilter in
map or reduce
> task
> >> >>>> >>> ? Or
> >> >>>> >>> you
> >> >>>> >>> mean using the PathFilter in InputFormat ?
> >> >>>> >>> I guess you mean the second case, if so you only
need to call
> >> >>>> >>> FileInputFormat.setInputPathFilter(,) to provide
the filter
> >> >>>> >>> information.
> >> >>>> >>>
> >> >>>> >>>
> >> >>>> >>> On Mon, Apr 12, 2010 at 8:13 AM, Kris Nuttycombe
> >> >>>> >>> <kris.nuttycombe@gmail.com>
> >> >>>> >>> wrote:
> >> >>>> >>>>
> >> >>>> >>>> Hi, all, quick question about using PathFilter.
> >> >>>> >>>>
> >> >>>> >>>> Is there any way to provide information from
the job
> >> >>>> >>>> configuration to
> >> >>>> >>>> a PathFilter instance? In my case, I want
to  limit the date
> >> >>>> >>>> range of
> >> >>>> >>>> the files being selected by the filter, and
don't want to have
> >> >>>> >>>> to
> >> >>>> >>>> hard-code a separate PathFilter instance for
each date range
> I'm
> >> >>>> >>>> interested in, obviously. If I make my PathFilter
extend
> >> >>>> >>>> Configured,
> >> >>>> >>>> will it do the right thing?
> >> >>>> >>>>
> >> >>>> >>>> Thanks!
> >> >>>> >>>>
> >> >>>> >>>> Kris
> >> >>>> >>>
> >> >>>> >>>
> >> >>>> >>>
> >> >>>> >>> --
> >> >>>> >>> Best Regards
> >> >>>> >>>
> >> >>>> >>> Jeff Zhang
> >> >>>> >>>
> >> >>>> >>
> >> >>>> >
> >> >>>
> >> >>>
> >> >>>
> >> >>> --
> >> >>> Best Regards
> >> >>>
> >> >>> Jeff Zhang
> >> >>>
> >> >>
> >> >
> >
> >
> >
> > --
> > Best Regards
> >
> > Jeff Zhang
> >
>



-- 
Best Regards

Jeff Zhang

Mime
View raw message