hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kris Nuttycombe <kris.nuttyco...@gmail.com>
Subject Re: Configured & PathFilter
Date Wed, 14 Apr 2010 14:26:58 GMT
On Wed, Apr 14, 2010 at 2:16 AM, Jeff Zhang <zjffdu@gmail.com> wrote:
> Hi Kris,
>
> I am not sure the mechanism of scala language. For the following lines of
> code:
>
>  var conf: Configuration = _
>  lazy val basePath   = option(conf.get(BasePath)).
> map(new Path(_))
>  lazy val eventType = option(conf.get(EventTypeName)).map(EventType(_))
>  lazy val start          = option(conf.get(StartTime)).map(t =>
> Timestamp(t.toLong))
>  lazy val end           = option(conf.get(EndTime)).map(t =>
> Timestamp(t.toLong))
>
>
> I guess these lines of code is extcuted when the HDFSEventLogPathFilter is
> created, right ? In that case, the conf object is null at that time. You'd
> better to put these code on the setConf method

Actually not; lazy vals in scala have their assignment expressions
evaluated the first time they are referenced; in this case it's in the
logging statement of the setConf method, but ordinarily it'd be upon
the first evaluation of the accept method of the filter. You can see
from the log output that the variables have been correctly assigned:

>> >> INF [20100413-08:52:14.308] somegra:
>> >>
>> >> List(Some(hdfs://hadoop-eventlog01.socialmedia.com/test-batchEventLog/metrics),
>> >> Some(MetricsEvent), None, None)

This log output is being generated by the code in setConf:

>> >>  override def setConf(conf: Configuration): Unit = {
>> >>    this.conf = conf
>> >>    log.info(List(basePath, eventType, start, end).toString)
>> >>  }

I believe that the incorrect error message results from globStatus
returning null in the case that the path filter fails to accept the
base path.

Kris

>
>
> On Wed, Apr 14, 2010 at 1:02 AM, Kris Nuttycombe <kris.nuttycombe@gmail.com>
> wrote:
>>
>> Ultimately, the issue was that my path filter was accepting the child
>> paths but not the base path. But the misleading error message (which I
>> believe is due to ill behavior of globStatus).
>>
>> Kris
>>
>> On Tue, Apr 13, 2010 at 10:37 AM, Kris Nuttycombe
>> <kris.nuttycombe@gmail.com> wrote:
>> > Okay, some more information mined from the source:
>> >
>> > This error is a red herring; it says that the input path does not
>> > exist but instead it means that no files were selected by the path
>> > filter. Here's the offending code:
>> >
>> >   for (int i=0; i < dirs.length; ++i) {
>> >      Path p = dirs[i];
>> >      FileSystem fs = p.getFileSystem(job.getConfiguration());
>> >      FileStatus[] matches = fs.globStatus(p, inputFilter);
>> >      if (matches == null) {
>> >        errors.add(new IOException("Input path does not exist: " + p));
>> >      } else if (matches.length == 0) {
>> >      ...
>> >   }
>> >
>> > I used this logging to determine the error:
>> >
>> >    log.info("Set input base path as: " + inPath.toString)
>> >    using(FileSystem.get(conf)) {
>> >      fs =>
>> >      log.info((fs.exists(inPath), fs.getFileStatus(inPath).isDir,
>> > fs.getFileStatus(inPath).getPath).toString)
>> >      val inPaths = FileInputFormat.getInputPaths(job)
>> >      if (inPaths == null) {
>> >        log.error("Input paths were null!")
>> >      } else {
>> >        for (dir <- inPaths) {
>> >          log.info("Searching input paths " + dir.toString)
>> >          val pathFilter = FileInputFormat.getInputPathFilter(job)
>> >          val status = fs.getFileStatus(dir)
>> >          if (status == null) {
>> >            log.error("The input path " + dir.toString + " was not
>> > found")
>> >          } else {
>> >            log.info("Attempting to obtain input files from path " +
>> > status.getPath + " (isDir = " + status.isDir + ")")
>> >          }
>> >          val files = fs.globStatus(dir, pathFilter)
>> >          if (files == null) {
>> >            log.error("No files found in input path!")
>> >          } else {
>> >            for (file <- files) {
>> >              log.info("Accepted file " + file)
>> >            }
>> >          }
>> >        }
>> >      }
>> >    }
>> >
>> > Output:
>> >
>> > INF [20100413-09:35:15.714] reporting: Set input base path as:
>> > hdfs://hadoop-eventlog01.socialmedia.com/test-batchEventLog/metrics
>> > INF [20100413-09:35:16.328] reporting:
>> >
>> > (true,true,hdfs://hadoop-eventlog01.socialmedia.com/test-batchEventLog/metrics)
>> > INF [20100413-09:35:16.330] reporting: Searching input paths
>> > hdfs://hadoop-eventlog01.socialmedia.com/test-batchEventLog/metrics
>> > INF [20100413-09:35:16.335] somegra:
>> >
>> > List(Some(hdfs://hadoop-eventlog01.socialmedia.com/test-batchEventLog/metrics),
>> > Some(MetricsEvent), None, None)
>> > INF [20100413-09:35:16.566] reporting: Attempting to obtain input
>> > files from path
>> > hdfs://hadoop-eventlog01.socialmedia.com/test-batchEventLog/metrics
>> > (isDir = true)
>> > ERR [20100413-09:35:16.570] reporting: No files found in input path!
>> > 10/04/13 09:35:19 WARN mapred.JobClient: Use GenericOptionsParser for
>> > parsing the arguments. Applications should implement Tool for the
>> > same.
>> > INF [20100413-09:35:20.214] somegra:
>> >
>> > List(Some(hdfs://hadoop-eventlog01.socialmedia.com/test-batchEventLog/metrics),
>> > Some(MetricsEvent), None, None)
>> >
>> > It appears that at some point, the semantics of globStatus have
>> > changed such that it never returns the empty array, and always returns
>> > null whether the failure is the absence of the base path or the
>> > inability to find any files.
>> >
>> > Should I file a bug (with patch?)
>> >
>> > Kris
>> >
>> > On Tue, Apr 13, 2010 at 10:07 AM, Kris Nuttycombe
>> > <kris.nuttycombe@gmail.com> wrote:
>> >> Jeff,
>> >>
>> >> Thanks very much for your help; I have at least gotten beyond the NPE
>> >> I was encountering when I was having my path filter extend Configured
>> >> instead of Configurable (though in looking at the code, I'm sort of
>> >> baffled... does setConf potentially get called multiple times?)
>> >> Unfortunately, I'm still stuck. Could you please take a quick look at
>> >> the following code and see if there's anything obvious to you that I
>> >> might be doing wrong?
>> >>
>> >> Here is the code that I use to configure my PathFilter instance, and
>> >> the PathFilter class itself (using Scala; I have starred the lines
>> >> that I believe to be relevant to this question in case it appears
>> >> dense):
>> >>
>> >> object HDFSEventLogPathFilter {
>> >>  // constants for configuration keys
>> >>  val BasePath             = "socialmedia.somegra.reporting.basePath"
>> >>  val EventTypeName = "socialmedia.somegra.reporting.eventType"
>> >>  val StartTime            = "socialmedia.somegra.reporting.startTime"
>> >>  val EndTime              = "socialmedia.somegra.reporting.endTime"
>> >>
>> >>  def configure(job: Job, basePath: Path, start: Option[Timestamp],
>> >> end: Option[Timestamp], et: EventType): Unit = {
>> >>    job.getConfiguration.set(BasePath, basePath.toString)
>> >>    job.getConfiguration.set(EventTypeName, et.keyPart)
>> >>    start.foreach(t => job.getConfiguration.set(StartTime,
>> >> t.epochMillis.toString))
>> >>    end.foreach(t => job.getConfiguration.set(EndTime,
>> >> t.epochMillis.toString))
>> >>
>> >> *    FileInputFormat.addInputPath(job, basePath)
>> >> *    FileInputFormat.setInputPathFilter(job,
>> >> classOf[HDFSEventLogPathFilter])
>> >>  }
>> >> }
>> >>
>> >> class HDFSEventLogPathFilter extends PathFilter with Configurable {
>> >>  val log = Logger.get
>> >>  import HDFSEventLog._
>> >>  import HDFSEventLogPathFilter._
>> >>
>> >>  var conf: Configuration = _
>> >>  lazy val basePath   = option(conf.get(BasePath)).map(new Path(_))
>> >>  lazy val eventType = option(conf.get(EventTypeName)).map(EventType(_))
>> >>  lazy val start          = option(conf.get(StartTime)).map(t =>
>> >> Timestamp(t.toLong))
>> >>  lazy val end           = option(conf.get(EndTime)).map(t =>
>> >> Timestamp(t.toLong))
>> >>
>> >>  override def setConf(conf: Configuration): Unit = {
>> >>    this.conf = conf
>> >>    log.info(List(basePath, eventType, start, end).toString)
>> >>  }
>> >>
>> >>  override def getConf = conf
>> >>
>> >>  override def accept(p: Path) = {
>> >>    basePath.forall {
>> >>      bp => bp.equals(p.getParent) && eventType.forall { et
=>
>> >>        p.getName.startsWith(et.keyPart) &&
>> >>        start.forall(t => p.getName >= (path(bp, et, t).getName))
&&
>> >>        end.forall(t => p.getName <= (path(bp, et, t).getName))
>> >>      }
>> >>    }
>> >>  }
>> >> }
>> >>
>> >> Here is the calling code:
>> >>
>> >>    log.info("Attempting to set input base path as: " + inPath.toString)
>> >>
>> >>    using(FileSystem.get(conf)) {
>> >>      fs => log.info((fs.exists(inPath),
>> >> fs.getFileStatus(inPath).isDir,
>> >> fs.getFileStatus(inPath).getPath).toString)
>> >>    }
>> >>
>> >>    HDFSEventLogPathFilter.configure(job, inPath, df.flatMap(_.start),
>> >> df.flatMap(_.end), MetricsEvent)
>> >>
>> >> Here is the output that I get upon running the program. The first two
>> >> log statements are from the log.info statements just above; the third
>> >> is from the PathFilter itself.
>> >>
>> >> INF [20100413-08:52:11.224] reporting: Attempting to set input base
>> >> path as:
>> >> hdfs://hadoop-eventlog01.socialmedia.com/test-batchEventLog/metrics
>> >> INF [20100413-08:52:11.939] reporting:
>> >>
>> >> (true,true,hdfs://hadoop-eventlog01.socialmedia.com/test-batchEventLog/metrics)
>> >> INF [20100413-08:52:14.308] somegra:
>> >>
>> >> List(Some(hdfs://hadoop-eventlog01.socialmedia.com/test-batchEventLog/metrics),
>> >> Some(MetricsEvent), None, None)
>> >> 10/04/13 08:52:13 WARN mapred.JobClient: Use GenericOptionsParser for
>> >> parsing the arguments. Applications should implement Tool for the
>> >> same.
>> >> Exception in thread "main"
>> >> org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input
>> >> path does not exist:
>> >> hdfs://hadoop-eventlog01.socialmedia.com/test-batchEventLog/metrics
>> >>        at
>> >> org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:224)
>> >>        at
>> >> org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:55)
>> >>        at
>> >> org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:241)
>> >>        at
>> >> org.apache.hadoop.mapred.JobClient.writeNewSplits(JobClient.java:885)
>> >>        at
>> >> org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:779)
>> >>        at org.apache.hadoop.mapreduce.Job.submit(Job.java:432)
>> >>        at
>> >> org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:447)
>> >>        at
>> >> socialmedia.somegra.reporting.HDFSMapReduceQuery.execute(HDFSMetricsQuery.scala:66)
>> >>
>> >> I'm uncertain as to whether this error is caused by the filter that
>> >> I'm using or something more fundamental, but I'm really struggling to
>> >> understand how the path which has just been determined to exist is not
>> >> seen as existent by the FileInputFormat. As further evidence of
>> >> existence, here's command line output showing the contents of this
>> >> directory:
>> >>
>> >> [knuttycombe@floorshow reporting (kjn-reporting-fhadoop)]$ hadoop fs
>> >> -ls hdfs://hadoop-eventlog01.socialmedia.com/test-batchEventLog/metrics
>> >> Found 7530 items
>> >> -rw-rw-rw-   3 jetty       hadoop    1604399 2010-04-07 13:10
>> >> /test-batchEventLog/metrics/metrics_1270671021526
>> >> -rw-rw-rw-   3 jetty       hadoop    1588231 2010-04-07 13:10
>> >> /test-batchEventLog/metrics/metrics_1270671029284
>> >> -rw-rw-rw-   3 jetty       hadoop    1590922 2010-04-07 13:10
>> >> /test-batchEventLog/metrics/metrics_1270671030131
>> >> -rw-rw-rw-   3 jetty       hadoop    1595381 2010-04-07 13:10
>> >> /test-batchEventLog/metrics/metrics_1270671030416
>> >> ...
>> >>
>> >>  What could I be doing wrong?
>> >>
>> >> Thanks again for your help,
>> >>
>> >> Kris
>> >>
>> >> On Tue, Apr 13, 2010 at 6:49 AM, Jeff Zhang <zjffdu@gmail.com> wrote:
>> >>> Kris,
>> >>>
>> >>> Here's sample PathFilter which can been configured, The only thing you
>> >>> need
>> >>> to do is add the following line to configure the job
>> >>>
>> >>> job.getConfiguration().set("pathfilter.pattern", "your_patter");
>> >>>
>> >>> Not sure whether this is what you want.
>> >>>
>> >>>
>> >>> public class MyPathFilter implements PathFilter ,Configurable{
>> >>>
>> >>>     private String pattern;
>> >>>
>> >>>     private Configuration conf;
>> >>>
>> >>>     public MyPathFilter(){
>> >>>
>> >>>     }
>> >>>
>> >>>     @Override
>> >>>     public boolean accept(Path path) {
>> >>>         if (path.getName().contains(conf.
>> >>> get("pathfilter.pattern"))) {
>> >>>             return true;
>> >>>         } else {
>> >>>             return false;
>> >>>         }
>> >>>     }
>> >>>
>> >>>     @Override
>> >>>     public Configuration getConf() {
>> >>>         return this.conf;
>> >>>     }
>> >>>
>> >>>     @Override
>> >>>     public void setConf(Configuration conf) {
>> >>>         this.conf=conf;
>> >>>     }
>> >>> }
>> >>>
>> >>>
>> >>>
>> >>> On Mon, Apr 12, 2010 at 4:05 PM, Kris Nuttycombe
>> >>> <kris.nuttycombe@gmail.com>
>> >>> wrote:
>> >>>>
>> >>>> Whoops, so much for that idea. The Configuration instance being
>> >>>> passed
>> >>>> to setConf is null.
>> >>>>
>> >>>> I am utterly baffled. Is there seriously nobody out there using
>> >>>> PathFilter in this way? Everyone's just using dumb PathFilter
>> >>>> instances that don't have any configurable functionality?
>> >>>>
>> >>>> /me boggles.
>> >>>>
>> >>>> Kris
>> >>>>
>> >>>> On Mon, Apr 12, 2010 at 2:03 PM, Kris Nuttycombe
>> >>>> <kris.nuttycombe@gmail.com> wrote:
>> >>>> > I just dove into the source, and it looks like the PathFilter
>> >>>> > instance
>> >>>> > is instantiated using ReflectionUtils, and setConf is called
so if
>> >>>> > the
>> >>>> > resulting PathFilter instance implements Configurable, then
>> >>>> > configuration will be available.
>> >>>> >
>> >>>> > Kris
>> >>>> >
>> >>>> > On Mon, Apr 12, 2010 at 1:52 PM, Kris Nuttycombe
>> >>>> > <kris.nuttycombe@gmail.com> wrote:
>> >>>> >> static void     setInputPathFilter(Job job, Class<?
extends
>> >>>> >> PathFilter>
>> >>>> >> filter)
>> >>>> >>
>> >>>> >> This indicates that reflection will be used to instantiate
the
>> >>>> >> required PathFilter object, and I need to be able to access
the
>> >>>> >> minimum and maximum date for a given run. I don't want
to have to
>> >>>> >> implement a separate PathFilter class for each set of dates,
>> >>>> >> obviously.
>> >>>> >>
>> >>>> >> Thanks,
>> >>>> >>
>> >>>> >> Kris
>> >>>> >>
>> >>>> >> On Mon, Apr 12, 2010 at 9:35 AM, Jeff Zhang <zjffdu@gmail.com>
>> >>>> >> wrote:
>> >>>> >>>  Hi Kris,
>> >>>> >>>
>> >>>> >>> Do you mean you want to use the PathFilter in map or
reduce task
>> >>>> >>> ? Or
>> >>>> >>> you
>> >>>> >>> mean using the PathFilter in InputFormat ?
>> >>>> >>> I guess you mean the second case, if so you only need
to call
>> >>>> >>> FileInputFormat.setInputPathFilter(,) to provide the
filter
>> >>>> >>> information.
>> >>>> >>>
>> >>>> >>>
>> >>>> >>> On Mon, Apr 12, 2010 at 8:13 AM, Kris Nuttycombe
>> >>>> >>> <kris.nuttycombe@gmail.com>
>> >>>> >>> wrote:
>> >>>> >>>>
>> >>>> >>>> Hi, all, quick question about using PathFilter.
>> >>>> >>>>
>> >>>> >>>> Is there any way to provide information from the
job
>> >>>> >>>> configuration to
>> >>>> >>>> a PathFilter instance? In my case, I want to  limit
the date
>> >>>> >>>> range of
>> >>>> >>>> the files being selected by the filter, and don't
want to have
>> >>>> >>>> to
>> >>>> >>>> hard-code a separate PathFilter instance for each
date range I'm
>> >>>> >>>> interested in, obviously. If I make my PathFilter
extend
>> >>>> >>>> Configured,
>> >>>> >>>> will it do the right thing?
>> >>>> >>>>
>> >>>> >>>> Thanks!
>> >>>> >>>>
>> >>>> >>>> Kris
>> >>>> >>>
>> >>>> >>>
>> >>>> >>>
>> >>>> >>> --
>> >>>> >>> Best Regards
>> >>>> >>>
>> >>>> >>> Jeff Zhang
>> >>>> >>>
>> >>>> >>
>> >>>> >
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>> Best Regards
>> >>>
>> >>> Jeff Zhang
>> >>>
>> >>
>> >
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>

Mime
View raw message