Return-Path: Delivered-To: apmail-lucene-hadoop-dev-archive@locus.apache.org Received: (qmail 5336 invoked from network); 5 Sep 2006 17:21:08 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 5 Sep 2006 17:21:08 -0000 Received: (qmail 16226 invoked by uid 500); 5 Sep 2006 17:21:07 -0000 Delivered-To: apmail-lucene-hadoop-dev-archive@lucene.apache.org Received: (qmail 16207 invoked by uid 500); 5 Sep 2006 17:21:07 -0000 Mailing-List: contact hadoop-dev-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-dev@lucene.apache.org Delivered-To: mailing list hadoop-dev@lucene.apache.org Received: (qmail 16198 invoked by uid 99); 5 Sep 2006 17:21:07 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 Sep 2006 10:21:07 -0700 X-ASF-Spam-Status: No, hits=0.0 required=10.0 tests= X-Spam-Check-By: apache.org Received: from [209.237.227.198] (HELO brutus.apache.org) (209.237.227.198) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 Sep 2006 10:21:06 -0700 Received: from brutus (localhost [127.0.0.1]) by brutus.apache.org (Postfix) with ESMTP id 1F5EC714315 for ; Tue, 5 Sep 2006 17:17:27 +0000 (GMT) Message-ID: <1357147.1157476647126.JavaMail.jira@brutus> Date: Tue, 5 Sep 2006 10:17:27 -0700 (PDT) From: "Doug Cutting (JIRA)" To: hadoop-dev@lucene.apache.org Subject: [jira] Commented: (HADOOP-288) RFC: Efficient file caching In-Reply-To: <6015694.1149730109866.JavaMail.jira@brutus> MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N [ http://issues.apache.org/jira/browse/HADOOP-288?page=comments#action_12432634 ] Doug Cutting commented on HADOOP-288: ------------------------------------- The indentation of this patch is non-standard. Please use 2 spaces per indent level, no tabs. Should the JobConf setters be adders? For example, should setCacheFiles(String) instead be named addCacheFile(Path)? Also, should we use paths instead of strings? > RFC: Efficient file caching > --------------------------- > > Key: HADOOP-288 > URL: http://issues.apache.org/jira/browse/HADOOP-288 > Project: Hadoop > Issue Type: Bug > Affects Versions: 0.6.0 > Reporter: Michel Tourn > Assigned To: Mahadev konar > Attachments: caching-3.patch, caching.patch, caching.patch, test.jar, test.zip > > > RFC: Efficient file caching > (on Hadoop Task nodes, for benefit of MapReduce Tasks) > ------------------------------------------------------ > We will start implementing this soon. Please provide feedback and improvements to this plan. > The header "Options:" indicates places where simple choices must be made. > Problem: > ------- > o MapReduce tasks require access to additional out-of-band data ("dictionaries") > This out-of-band data is: > o in addition to the map/reduce inputs. > o large (1GB+) > o broadcast (same data is required on all the Task nodes) > o changes "infrequently", in particular: > oo it is always constant for all the Tasks in a Job. > oo it is often constant for a month at a time > oo it may be shared across team members > o sometimes used by pure-Java MapReduce programs > o sometimes used by non-Java MapReduce programs (using Hadoop-Streaming) > o (future) used by programs that use HDFS and Task-trackers but not MapReduce. > Existing Solutions to the problem: > --------------------------------- > These solutions are not good enough. The present proposal is to do Sol 1 with caching. > Sol 1: Pure Hadoop: package the out-of-band data in the MapReduce Job jar file. > Sol 2: Non Hadoop: for each task node run rsync from single source for data. > Sol 3: Non Hadoop: use BitTorrent, etc. > Sol.1 is correct but slow for many reasons: > The Job submitter must recreate a large jar(tar) file for every Job. > (The jar contains both changing programs and stable dictionaries) > The large Jar file must be propagated from the client to HDFS with > a large replication factor. > At the beginning of every Task, the Task tracker gets the job jar from HDFS > and unjars it in the working directory. This can dominate task execution time. > > Sol.2 has nice properties but also some problems. > It does not scale well with large clusters (many concurrent rsync read requests i.e. single-source broadcast) > It assumes that Hadoop users can upload data using rsync to the cluster nodes. As a policy, this is not allowed. > It requires rsync. > > Sol.3 alleviates the rsync scalability problems but > It is a dependency on an external system. > We want something simpler and more tightly integrated with Hadoop. > > Staging (uploading) out-of-band data: > ------------------------------------ > The out-of-band data will often originate on the local filesystem of a user machine > (i.e. a MapReduce job submitter) > Nevertheless it makes sense to use HDFS to store the original out-of-band data because: > o HDFS has (wide) replication. This enables scalable broadcast later. > o HDFS is an available channel to move data from clients to all task machines. > o HDFS is convenient as a shared location among Hadoop team members. > Accessing (downloading) out-of-band data: > ---------------------------------------- > The non-Java MapReduce programs do not have or want[1] APIs for HDFS. > Instead these programs just want to access out-of-band data as > local files at predefined paths. > ([1] Existing programs should be reusable with no changes. > This is often possible bec. communication is over stdin/stdout.) > Job's jar file as a special case: > -------------------------------- > One use case is to allow users to make the job jar itself cachable. > This is only useful in cases where NOTHING changes when a job is resubmitted > (no MapRed code changes and no changes in shipped data) > This situation might occur with an 'extractor' job (gets data from an external source: like Nutch crawler) > Currently the Hadoop mapred-jar mechanism works in this way: > the job jar data is unjarred in the "working directory" of the Task > the jar contains both MapRed java code (added to classpath) > Cache synchronization: > --------------------- > The efficient implementation of the out-of-band data distribution > is mostly a cache synchronization problem. > A list of the various aspects where choices must be made follows. > Cache key: > --------- > How do you test that the cached copy is out-of-date? > Options: > 1. the archive/file timestamp > 2. the MD5 of the archive/file content > Comparing source and destination Timestamps is problematic bec. it assumes synchronized clocks. > Also there is no last-modif metadata in HDFS (for good reasons, like scalability of metadata ops) > Timestamps stored with the source ('last-propagate-time') do > not require synchronized clocks, only locally monotonic time. > (and the worse which can happen at daylight-savings switch is a missed update or an extra-update) > The cache code could store a copy of the local timestamp > in the same way that it caches the value of the content hash along with the source data. > > Cachable unit: > ------------- > Options: individual files or archives or both. > Note: > At the API level, directories will be processed recursively > (and the local FS directories will parallel HDFS directories) > So bulk operations are always possible using directories. > The question here is whether to handle archives as an additional bulk mechanism. > Archives are special because: > o unarchiving occurs transparently as part of the cache sync > o The cache key is computed on the archive and preserved although > the archive itself is not preserved. > Supported archive format will be: tar (maybe tgz or compressed jar) > Archive detection test: by filename extension ".tar" or ".jar" > Suppose we don't handle archives as special files: > Pros: > o less code, no discussion about which archive formats are supported > o fine for large dictionary files. And when files are not large, user may as well > put them in the Job jar as usual. > o user code could always check and unarchive specific cached files > (as a side-effect of MapRed task initialization) > Cons: > o handling small files may be inefficient > (multiple HDFS operations, multiple hash computation, > one 'metadata' hash file along with each small file) > o It will not be possible to handle the Job's jar file as a special case of caching > Cache isolation: > --------------- > In some cases it may be a problem if the cached HDFS files are updated while a Job is in progress: > The file may become unavailable for a short period of time and some tasks fail. > The file may change (atomically) and different tasks use a different version. > This isolation problem is not addressed in this proposal. > Standard solutions to the isolation problem are: > o Assume that Jobs and interfering cache updates won't occur concurrently. > o Put a version number in the HDFS file paths and refer to a hard-coded version in the Job code. > o Before running the MapRed job, run a non-distributed application that tests > what is the latest available version of the out-of-band data. > Then make this version available to the MapRed job. > Two ways to do this. > o either set a job property just-in-time: > addCachePathPair("/mydata/v1234/", "localcache/mydata_latest"); > (see Job Configuration for meaning of this) > o or publish the decision as an HDFS file containing the version. > then rely on user code to read the version, and manually populate the cache: > Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir"); > (see MapReduce API for meaning of this) > Cache synchronization stages: > ---------------------------- > There are two stages: Client-to-HDFS and HDFS-to-TaskTracker > o Client-to-HDFS stage. > Options: A simple option is to not do anything here, i.e. rely on the user. > This is a reasonable option given previous remarks on the role of HDFS: > HDFS is a staging/publishing area and a natural shared location. > In particular this means that the system need not track > where the client files come from. > o HDFS-to-TaskTracker: > Client-to-HDFS synchronization (if done at all) should happen before this. > Then HDFS-to-TaskTracker synchronization must happen right before > the data is needed on a node. > MapReduce cache API: > ------------------- > Options: > 1. No change in MapReduce framework code: > require the user to put this logic in map() (or reduce) function: > in MyMapper constructor (or in map() on first record) user is asked to add: > > Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir"); > Cache.syncCache("..."); //etc. > > ----- > 2. Put this logic in MapReduce framework and use Job properties to > communicate the list of pairs (hdfs path; local path) > > Directories are processed recursively. > If archives are treated specially then they are unarchived on destination. > > MapReduce Job Configuration: > --------------------------- > Options: > with No change in MapReduce framework code (see above) > no special Job configuration: > it is up to the MapRed writer to configure and run the cache operations. > --- > with Logic in MapReduce framework (see above) > some simple Job configuration > JobConf.addCachePathPair(String, String) > JobConf.addCachePathPair("/hdfs/path/fileordir", "relative/local/fileordir"); -- This message is automatically generated by JIRA. - If you think it was sent incorrectly contact one of the administrators: http://issues.apache.org/jira/secure/Administrators.jspa - For more information on JIRA, see: http://www.atlassian.com/software/jira