hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kris Nuttycombe <kris.nuttyco...@gmail.com>
Subject Re: Configured & PathFilter
Date Tue, 13 Apr 2010 17:02:27 GMT
Ultimately, the issue was that my path filter was accepting the child
paths but not the base path. But the misleading error message (which I
believe is due to ill behavior of globStatus).

Kris

On Tue, Apr 13, 2010 at 10:37 AM, Kris Nuttycombe
<kris.nuttycombe@gmail.com> wrote:
> Okay, some more information mined from the source:
>
> This error is a red herring; it says that the input path does not
> exist but instead it means that no files were selected by the path
> filter. Here's the offending code:
>
>   for (int i=0; i < dirs.length; ++i) {
>      Path p = dirs[i];
>      FileSystem fs = p.getFileSystem(job.getConfiguration());
>      FileStatus[] matches = fs.globStatus(p, inputFilter);
>      if (matches == null) {
>        errors.add(new IOException("Input path does not exist: " + p));
>      } else if (matches.length == 0) {
>      ...
>   }
>
> I used this logging to determine the error:
>
>    log.info("Set input base path as: " + inPath.toString)
>    using(FileSystem.get(conf)) {
>      fs =>
>      log.info((fs.exists(inPath), fs.getFileStatus(inPath).isDir,
> fs.getFileStatus(inPath).getPath).toString)
>      val inPaths = FileInputFormat.getInputPaths(job)
>      if (inPaths == null) {
>        log.error("Input paths were null!")
>      } else {
>        for (dir <- inPaths) {
>          log.info("Searching input paths " + dir.toString)
>          val pathFilter = FileInputFormat.getInputPathFilter(job)
>          val status = fs.getFileStatus(dir)
>          if (status == null) {
>            log.error("The input path " + dir.toString + " was not found")
>          } else {
>            log.info("Attempting to obtain input files from path " +
> status.getPath + " (isDir = " + status.isDir + ")")
>          }
>          val files = fs.globStatus(dir, pathFilter)
>          if (files == null) {
>            log.error("No files found in input path!")
>          } else {
>            for (file <- files) {
>              log.info("Accepted file " + file)
>            }
>          }
>        }
>      }
>    }
>
> Output:
>
> INF [20100413-09:35:15.714] reporting: Set input base path as:
> hdfs://hadoop-eventlog01.socialmedia.com/test-batchEventLog/metrics
> INF [20100413-09:35:16.328] reporting:
> (true,true,hdfs://hadoop-eventlog01.socialmedia.com/test-batchEventLog/metrics)
> INF [20100413-09:35:16.330] reporting: Searching input paths
> hdfs://hadoop-eventlog01.socialmedia.com/test-batchEventLog/metrics
> INF [20100413-09:35:16.335] somegra:
> List(Some(hdfs://hadoop-eventlog01.socialmedia.com/test-batchEventLog/metrics),
> Some(MetricsEvent), None, None)
> INF [20100413-09:35:16.566] reporting: Attempting to obtain input
> files from path
> hdfs://hadoop-eventlog01.socialmedia.com/test-batchEventLog/metrics
> (isDir = true)
> ERR [20100413-09:35:16.570] reporting: No files found in input path!
> 10/04/13 09:35:19 WARN mapred.JobClient: Use GenericOptionsParser for
> parsing the arguments. Applications should implement Tool for the
> same.
> INF [20100413-09:35:20.214] somegra:
> List(Some(hdfs://hadoop-eventlog01.socialmedia.com/test-batchEventLog/metrics),
> Some(MetricsEvent), None, None)
>
> It appears that at some point, the semantics of globStatus have
> changed such that it never returns the empty array, and always returns
> null whether the failure is the absence of the base path or the
> inability to find any files.
>
> Should I file a bug (with patch?)
>
> Kris
>
> On Tue, Apr 13, 2010 at 10:07 AM, Kris Nuttycombe
> <kris.nuttycombe@gmail.com> wrote:
>> Jeff,
>>
>> Thanks very much for your help; I have at least gotten beyond the NPE
>> I was encountering when I was having my path filter extend Configured
>> instead of Configurable (though in looking at the code, I'm sort of
>> baffled... does setConf potentially get called multiple times?)
>> Unfortunately, I'm still stuck. Could you please take a quick look at
>> the following code and see if there's anything obvious to you that I
>> might be doing wrong?
>>
>> Here is the code that I use to configure my PathFilter instance, and
>> the PathFilter class itself (using Scala; I have starred the lines
>> that I believe to be relevant to this question in case it appears
>> dense):
>>
>> object HDFSEventLogPathFilter {
>>  // constants for configuration keys
>>  val BasePath             = "socialmedia.somegra.reporting.basePath"
>>  val EventTypeName = "socialmedia.somegra.reporting.eventType"
>>  val StartTime            = "socialmedia.somegra.reporting.startTime"
>>  val EndTime              = "socialmedia.somegra.reporting.endTime"
>>
>>  def configure(job: Job, basePath: Path, start: Option[Timestamp],
>> end: Option[Timestamp], et: EventType): Unit = {
>>    job.getConfiguration.set(BasePath, basePath.toString)
>>    job.getConfiguration.set(EventTypeName, et.keyPart)
>>    start.foreach(t => job.getConfiguration.set(StartTime,
>> t.epochMillis.toString))
>>    end.foreach(t => job.getConfiguration.set(EndTime, t.epochMillis.toString))
>>
>> *    FileInputFormat.addInputPath(job, basePath)
>> *    FileInputFormat.setInputPathFilter(job, classOf[HDFSEventLogPathFilter])
>>  }
>> }
>>
>> class HDFSEventLogPathFilter extends PathFilter with Configurable {
>>  val log = Logger.get
>>  import HDFSEventLog._
>>  import HDFSEventLogPathFilter._
>>
>>  var conf: Configuration = _
>>  lazy val basePath   = option(conf.get(BasePath)).map(new Path(_))
>>  lazy val eventType = option(conf.get(EventTypeName)).map(EventType(_))
>>  lazy val start          = option(conf.get(StartTime)).map(t =>
>> Timestamp(t.toLong))
>>  lazy val end           = option(conf.get(EndTime)).map(t =>
>> Timestamp(t.toLong))
>>
>>  override def setConf(conf: Configuration): Unit = {
>>    this.conf = conf
>>    log.info(List(basePath, eventType, start, end).toString)
>>  }
>>
>>  override def getConf = conf
>>
>>  override def accept(p: Path) = {
>>    basePath.forall {
>>      bp => bp.equals(p.getParent) && eventType.forall { et =>
>>        p.getName.startsWith(et.keyPart) &&
>>        start.forall(t => p.getName >= (path(bp, et, t).getName)) &&
>>        end.forall(t => p.getName <= (path(bp, et, t).getName))
>>      }
>>    }
>>  }
>> }
>>
>> Here is the calling code:
>>
>>    log.info("Attempting to set input base path as: " + inPath.toString)
>>
>>    using(FileSystem.get(conf)) {
>>      fs => log.info((fs.exists(inPath),
>> fs.getFileStatus(inPath).isDir,
>> fs.getFileStatus(inPath).getPath).toString)
>>    }
>>
>>    HDFSEventLogPathFilter.configure(job, inPath, df.flatMap(_.start),
>> df.flatMap(_.end), MetricsEvent)
>>
>> Here is the output that I get upon running the program. The first two
>> log statements are from the log.info statements just above; the third
>> is from the PathFilter itself.
>>
>> INF [20100413-08:52:11.224] reporting: Attempting to set input base
>> path as: hdfs://hadoop-eventlog01.socialmedia.com/test-batchEventLog/metrics
>> INF [20100413-08:52:11.939] reporting:
>> (true,true,hdfs://hadoop-eventlog01.socialmedia.com/test-batchEventLog/metrics)
>> INF [20100413-08:52:14.308] somegra:
>> List(Some(hdfs://hadoop-eventlog01.socialmedia.com/test-batchEventLog/metrics),
>> Some(MetricsEvent), None, None)
>> 10/04/13 08:52:13 WARN mapred.JobClient: Use GenericOptionsParser for
>> parsing the arguments. Applications should implement Tool for the
>> same.
>> Exception in thread "main"
>> org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input
>> path does not exist:
>> hdfs://hadoop-eventlog01.socialmedia.com/test-batchEventLog/metrics
>>        at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:224)
>>        at org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:55)
>>        at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:241)
>>        at org.apache.hadoop.mapred.JobClient.writeNewSplits(JobClient.java:885)
>>        at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:779)
>>        at org.apache.hadoop.mapreduce.Job.submit(Job.java:432)
>>        at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:447)
>>        at socialmedia.somegra.reporting.HDFSMapReduceQuery.execute(HDFSMetricsQuery.scala:66)
>>
>> I'm uncertain as to whether this error is caused by the filter that
>> I'm using or something more fundamental, but I'm really struggling to
>> understand how the path which has just been determined to exist is not
>> seen as existent by the FileInputFormat. As further evidence of
>> existence, here's command line output showing the contents of this
>> directory:
>>
>> [knuttycombe@floorshow reporting (kjn-reporting-fhadoop)]$ hadoop fs
>> -ls hdfs://hadoop-eventlog01.socialmedia.com/test-batchEventLog/metrics
>> Found 7530 items
>> -rw-rw-rw-   3 jetty       hadoop    1604399 2010-04-07 13:10
>> /test-batchEventLog/metrics/metrics_1270671021526
>> -rw-rw-rw-   3 jetty       hadoop    1588231 2010-04-07 13:10
>> /test-batchEventLog/metrics/metrics_1270671029284
>> -rw-rw-rw-   3 jetty       hadoop    1590922 2010-04-07 13:10
>> /test-batchEventLog/metrics/metrics_1270671030131
>> -rw-rw-rw-   3 jetty       hadoop    1595381 2010-04-07 13:10
>> /test-batchEventLog/metrics/metrics_1270671030416
>> ...
>>
>>  What could I be doing wrong?
>>
>> Thanks again for your help,
>>
>> Kris
>>
>> On Tue, Apr 13, 2010 at 6:49 AM, Jeff Zhang <zjffdu@gmail.com> wrote:
>>> Kris,
>>>
>>> Here's sample PathFilter which can been configured, The only thing you need
>>> to do is add the following line to configure the job
>>>
>>> job.getConfiguration().set("pathfilter.pattern", "your_patter");
>>>
>>> Not sure whether this is what you want.
>>>
>>>
>>> public class MyPathFilter implements PathFilter ,Configurable{
>>>
>>>     private String pattern;
>>>
>>>     private Configuration conf;
>>>
>>>     public MyPathFilter(){
>>>
>>>     }
>>>
>>>     @Override
>>>     public boolean accept(Path path) {
>>>         if (path.getName().contains(conf.
>>> get("pathfilter.pattern"))) {
>>>             return true;
>>>         } else {
>>>             return false;
>>>         }
>>>     }
>>>
>>>     @Override
>>>     public Configuration getConf() {
>>>         return this.conf;
>>>     }
>>>
>>>     @Override
>>>     public void setConf(Configuration conf) {
>>>         this.conf=conf;
>>>     }
>>> }
>>>
>>>
>>>
>>> On Mon, Apr 12, 2010 at 4:05 PM, Kris Nuttycombe <kris.nuttycombe@gmail.com>
>>> wrote:
>>>>
>>>> Whoops, so much for that idea. The Configuration instance being passed
>>>> to setConf is null.
>>>>
>>>> I am utterly baffled. Is there seriously nobody out there using
>>>> PathFilter in this way? Everyone's just using dumb PathFilter
>>>> instances that don't have any configurable functionality?
>>>>
>>>> /me boggles.
>>>>
>>>> Kris
>>>>
>>>> On Mon, Apr 12, 2010 at 2:03 PM, Kris Nuttycombe
>>>> <kris.nuttycombe@gmail.com> wrote:
>>>> > I just dove into the source, and it looks like the PathFilter instance
>>>> > is instantiated using ReflectionUtils, and setConf is called so if the
>>>> > resulting PathFilter instance implements Configurable, then
>>>> > configuration will be available.
>>>> >
>>>> > Kris
>>>> >
>>>> > On Mon, Apr 12, 2010 at 1:52 PM, Kris Nuttycombe
>>>> > <kris.nuttycombe@gmail.com> wrote:
>>>> >> static void     setInputPathFilter(Job job, Class<? extends
PathFilter>
>>>> >> filter)
>>>> >>
>>>> >> This indicates that reflection will be used to instantiate the
>>>> >> required PathFilter object, and I need to be able to access the
>>>> >> minimum and maximum date for a given run. I don't want to have to
>>>> >> implement a separate PathFilter class for each set of dates,
>>>> >> obviously.
>>>> >>
>>>> >> Thanks,
>>>> >>
>>>> >> Kris
>>>> >>
>>>> >> On Mon, Apr 12, 2010 at 9:35 AM, Jeff Zhang <zjffdu@gmail.com>
wrote:
>>>> >>>  Hi Kris,
>>>> >>>
>>>> >>> Do you mean you want to use the PathFilter in map or reduce
task ? Or
>>>> >>> you
>>>> >>> mean using the PathFilter in InputFormat ?
>>>> >>> I guess you mean the second case, if so you only need to call
>>>> >>> FileInputFormat.setInputPathFilter(,) to provide the filter
>>>> >>> information.
>>>> >>>
>>>> >>>
>>>> >>> On Mon, Apr 12, 2010 at 8:13 AM, Kris Nuttycombe
>>>> >>> <kris.nuttycombe@gmail.com>
>>>> >>> wrote:
>>>> >>>>
>>>> >>>> Hi, all, quick question about using PathFilter.
>>>> >>>>
>>>> >>>> Is there any way to provide information from the job configuration
to
>>>> >>>> a PathFilter instance? In my case, I want to  limit the
date range of
>>>> >>>> the files being selected by the filter, and don't want to
have to
>>>> >>>> hard-code a separate PathFilter instance for each date range
I'm
>>>> >>>> interested in, obviously. If I make my PathFilter extend
Configured,
>>>> >>>> will it do the right thing?
>>>> >>>>
>>>> >>>> Thanks!
>>>> >>>>
>>>> >>>> Kris
>>>> >>>
>>>> >>>
>>>> >>>
>>>> >>> --
>>>> >>> Best Regards
>>>> >>>
>>>> >>> Jeff Zhang
>>>> >>>
>>>> >>
>>>> >
>>>
>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>
>

Mime
View raw message