hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kris Nuttycombe <kris.nuttyco...@gmail.com>
Subject Re: Configured & PathFilter
Date Tue, 13 Apr 2010 16:07:10 GMT
Jeff,

Thanks very much for your help; I have at least gotten beyond the NPE
I was encountering when I was having my path filter extend Configured
instead of Configurable (though in looking at the code, I'm sort of
baffled... does setConf potentially get called multiple times?)
Unfortunately, I'm still stuck. Could you please take a quick look at
the following code and see if there's anything obvious to you that I
might be doing wrong?

Here is the code that I use to configure my PathFilter instance, and
the PathFilter class itself (using Scala; I have starred the lines
that I believe to be relevant to this question in case it appears
dense):

object HDFSEventLogPathFilter {
  // constants for configuration keys
  val BasePath             = "socialmedia.somegra.reporting.basePath"
  val EventTypeName = "socialmedia.somegra.reporting.eventType"
  val StartTime            = "socialmedia.somegra.reporting.startTime"
  val EndTime              = "socialmedia.somegra.reporting.endTime"

  def configure(job: Job, basePath: Path, start: Option[Timestamp],
end: Option[Timestamp], et: EventType): Unit = {
    job.getConfiguration.set(BasePath, basePath.toString)
    job.getConfiguration.set(EventTypeName, et.keyPart)
    start.foreach(t => job.getConfiguration.set(StartTime,
t.epochMillis.toString))
    end.foreach(t => job.getConfiguration.set(EndTime, t.epochMillis.toString))

*    FileInputFormat.addInputPath(job, basePath)
*    FileInputFormat.setInputPathFilter(job, classOf[HDFSEventLogPathFilter])
  }
}

class HDFSEventLogPathFilter extends PathFilter with Configurable {
  val log = Logger.get
  import HDFSEventLog._
  import HDFSEventLogPathFilter._

  var conf: Configuration = _
  lazy val basePath   = option(conf.get(BasePath)).map(new Path(_))
  lazy val eventType = option(conf.get(EventTypeName)).map(EventType(_))
  lazy val start          = option(conf.get(StartTime)).map(t =>
Timestamp(t.toLong))
  lazy val end           = option(conf.get(EndTime)).map(t =>
Timestamp(t.toLong))

  override def setConf(conf: Configuration): Unit = {
    this.conf = conf
    log.info(List(basePath, eventType, start, end).toString)
  }

  override def getConf = conf

  override def accept(p: Path) = {
    basePath.forall {
      bp => bp.equals(p.getParent) && eventType.forall { et =>
        p.getName.startsWith(et.keyPart) &&
        start.forall(t => p.getName >= (path(bp, et, t).getName)) &&
        end.forall(t => p.getName <= (path(bp, et, t).getName))
      }
    }
  }
}

Here is the calling code:

    log.info("Attempting to set input base path as: " + inPath.toString)

    using(FileSystem.get(conf)) {
      fs => log.info((fs.exists(inPath),
fs.getFileStatus(inPath).isDir,
fs.getFileStatus(inPath).getPath).toString)
    }

    HDFSEventLogPathFilter.configure(job, inPath, df.flatMap(_.start),
df.flatMap(_.end), MetricsEvent)

Here is the output that I get upon running the program. The first two
log statements are from the log.info statements just above; the third
is from the PathFilter itself.

INF [20100413-08:52:11.224] reporting: Attempting to set input base
path as: hdfs://hadoop-eventlog01.socialmedia.com/test-batchEventLog/metrics
INF [20100413-08:52:11.939] reporting:
(true,true,hdfs://hadoop-eventlog01.socialmedia.com/test-batchEventLog/metrics)
INF [20100413-08:52:14.308] somegra:
List(Some(hdfs://hadoop-eventlog01.socialmedia.com/test-batchEventLog/metrics),
Some(MetricsEvent), None, None)
10/04/13 08:52:13 WARN mapred.JobClient: Use GenericOptionsParser for
parsing the arguments. Applications should implement Tool for the
same.
Exception in thread "main"
org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input
path does not exist:
hdfs://hadoop-eventlog01.socialmedia.com/test-batchEventLog/metrics
        at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:224)
        at org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:55)
        at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:241)
        at org.apache.hadoop.mapred.JobClient.writeNewSplits(JobClient.java:885)
        at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:779)
        at org.apache.hadoop.mapreduce.Job.submit(Job.java:432)
        at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:447)
        at socialmedia.somegra.reporting.HDFSMapReduceQuery.execute(HDFSMetricsQuery.scala:66)

I'm uncertain as to whether this error is caused by the filter that
I'm using or something more fundamental, but I'm really struggling to
understand how the path which has just been determined to exist is not
seen as existent by the FileInputFormat. As further evidence of
existence, here's command line output showing the contents of this
directory:

[knuttycombe@floorshow reporting (kjn-reporting-fhadoop)]$ hadoop fs
-ls hdfs://hadoop-eventlog01.socialmedia.com/test-batchEventLog/metrics
Found 7530 items
-rw-rw-rw-   3 jetty       hadoop    1604399 2010-04-07 13:10
/test-batchEventLog/metrics/metrics_1270671021526
-rw-rw-rw-   3 jetty       hadoop    1588231 2010-04-07 13:10
/test-batchEventLog/metrics/metrics_1270671029284
-rw-rw-rw-   3 jetty       hadoop    1590922 2010-04-07 13:10
/test-batchEventLog/metrics/metrics_1270671030131
-rw-rw-rw-   3 jetty       hadoop    1595381 2010-04-07 13:10
/test-batchEventLog/metrics/metrics_1270671030416
...

 What could I be doing wrong?

Thanks again for your help,

Kris

On Tue, Apr 13, 2010 at 6:49 AM, Jeff Zhang <zjffdu@gmail.com> wrote:
> Kris,
>
> Here's sample PathFilter which can been configured, The only thing you need
> to do is add the following line to configure the job
>
> job.getConfiguration().set("pathfilter.pattern", "your_patter");
>
> Not sure whether this is what you want.
>
>
> public class MyPathFilter implements PathFilter ,Configurable{
>
>     private String pattern;
>
>     private Configuration conf;
>
>     public MyPathFilter(){
>
>     }
>
>     @Override
>     public boolean accept(Path path) {
>         if (path.getName().contains(conf.
> get("pathfilter.pattern"))) {
>             return true;
>         } else {
>             return false;
>         }
>     }
>
>     @Override
>     public Configuration getConf() {
>         return this.conf;
>     }
>
>     @Override
>     public void setConf(Configuration conf) {
>         this.conf=conf;
>     }
> }
>
>
>
> On Mon, Apr 12, 2010 at 4:05 PM, Kris Nuttycombe <kris.nuttycombe@gmail.com>
> wrote:
>>
>> Whoops, so much for that idea. The Configuration instance being passed
>> to setConf is null.
>>
>> I am utterly baffled. Is there seriously nobody out there using
>> PathFilter in this way? Everyone's just using dumb PathFilter
>> instances that don't have any configurable functionality?
>>
>> /me boggles.
>>
>> Kris
>>
>> On Mon, Apr 12, 2010 at 2:03 PM, Kris Nuttycombe
>> <kris.nuttycombe@gmail.com> wrote:
>> > I just dove into the source, and it looks like the PathFilter instance
>> > is instantiated using ReflectionUtils, and setConf is called so if the
>> > resulting PathFilter instance implements Configurable, then
>> > configuration will be available.
>> >
>> > Kris
>> >
>> > On Mon, Apr 12, 2010 at 1:52 PM, Kris Nuttycombe
>> > <kris.nuttycombe@gmail.com> wrote:
>> >> static void     setInputPathFilter(Job job, Class<? extends PathFilter>
>> >> filter)
>> >>
>> >> This indicates that reflection will be used to instantiate the
>> >> required PathFilter object, and I need to be able to access the
>> >> minimum and maximum date for a given run. I don't want to have to
>> >> implement a separate PathFilter class for each set of dates,
>> >> obviously.
>> >>
>> >> Thanks,
>> >>
>> >> Kris
>> >>
>> >> On Mon, Apr 12, 2010 at 9:35 AM, Jeff Zhang <zjffdu@gmail.com> wrote:
>> >>>  Hi Kris,
>> >>>
>> >>> Do you mean you want to use the PathFilter in map or reduce task ? Or
>> >>> you
>> >>> mean using the PathFilter in InputFormat ?
>> >>> I guess you mean the second case, if so you only need to call
>> >>> FileInputFormat.setInputPathFilter(,) to provide the filter
>> >>> information.
>> >>>
>> >>>
>> >>> On Mon, Apr 12, 2010 at 8:13 AM, Kris Nuttycombe
>> >>> <kris.nuttycombe@gmail.com>
>> >>> wrote:
>> >>>>
>> >>>> Hi, all, quick question about using PathFilter.
>> >>>>
>> >>>> Is there any way to provide information from the job configuration
to
>> >>>> a PathFilter instance? In my case, I want to  limit the date range
of
>> >>>> the files being selected by the filter, and don't want to have to
>> >>>> hard-code a separate PathFilter instance for each date range I'm
>> >>>> interested in, obviously. If I make my PathFilter extend Configured,
>> >>>> will it do the right thing?
>> >>>>
>> >>>> Thanks!
>> >>>>
>> >>>> Kris
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>> Best Regards
>> >>>
>> >>> Jeff Zhang
>> >>>
>> >>
>> >
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>

Mime
View raw message