hadoop-common-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eric Baldeschwieler <eri...@yahoo-inc.com>
Subject Re: Hadoop in an OSGi environment
Date Thu, 06 Sep 2007 08:48:59 GMT
Sounds interesting.  Let us know of any success you have!

Cross posting to -dev so more folks will notice.

On Sep 5, 2007, at 2:47 PM, David Savage wrote:

> Thx very much, sorry for the spam in previous mail in that case.
>
> Yep agreed, most of the changes were minor changes really - I'll do as
> you suggest and submit jira issues for sensible sub groups.
>
> Regards,
>
> Dave
>
> Michael Bieniosek wrote:
> > The "hadoop way" of submitting patches is to create a JIRA issue  
> for each
> > patch so they can be tested and discussed separately.  It looks  
> like you
> > have several unrelated changes in there.  You'll also need to  
> regenerate
> > your patches against HEAD.
> >
> > It's always nice to have more contributors.  I'm glad to hear you  
> find
> > hadoop useful.
> >
> > -Michael
> >
> > On 9/5/07 9:04 AM, "David Savage" <dave.savage@paremus.com> wrote:
> >
> >> Hi All,
> >>
> >> Just popping my head up over the lurkers parapet for a second to  
> let you
> >> know about some development work I've been doing regarding getting
> >> Hadoop to run in an OSGi environment.
> >>
> >> Sorry if this is the wrong forum for this post. I did consider  
> sending
> >> this to the hadoop-dev list but on inspection of the traffic  
> there it
> >> seemed to be mostly Jira issue tracking of which this is not yet  
> a part....
> >>
> >> In order to get Hadoop to work in an OSGi environment I had to  
> make a
> >> couple of changes to the Hadoop core, mostly with regard to better
> >> handling of classloading in the TaskTracker$Child process.
> >>
> >> Attached is a patch file of the changes I made which is compiled  
> against
> >> the 0.14.0 svn tag - comments are included at each diff point to  
> explain
> >> what I'm doing. Obviously really happy for these changes to be  
> added to
> >> Hadoop or for you to suggest any other changes or improvements.
> >>
> >> Short background with some product placement - please skip if  
> not in
> >> slightest bit interested...
> >>
> >> The background to this work was to get Hadoop running against our
> >> product Infiniflow of which there is an opensource component -  
> Newton
> >> (http://newton.codecauldron.org).
> >>
> >> Newton is a distributed runtime based on OSGi which handles runtime
> >> deployment and failover of SCA composites over a compute fabric  
> (our
> >> marketing term for bunch of hetrogenous servers).
> >>
> >> Final blurb...
> >>
> >> If anyone is able to make it I should hopefully be presenting  
> some of
> >> this work at the NYJavaSIG on 17th October at a session entitled  
> "OSGi
> >> In The Enterprise".
> >>
> >> Regards,
> >>
> >> Dave Savage
> >> Software Engineer
> >> Paremus Ltd
> >> http://www.paremus.com
> >> http://www.codecauldron.org
> >>
> >>
> >>  
> ______________________________________________________________________ 
> _
> >> Paremus Limited. Registered in England
> >>  No. 4181472
> >>  Registered Office: St Alphage House, 2 Fore Street, London,  
> EC2Y 5DH Postal
> >> Address: 107-111 Fleet Street, London, EC4A 2AB
> >> The information transmitted is intended only for the person(s)  
> or entity to
> >> which it is addressed and may contain confidential and/or  
> privileged material.
> >> Any review, retransmission, dissemination or other use of, or  
> taking of any
> >> action in reliance upon, this information by persons or entities  
> other than
> >> the intended recipient is prohibited.
> >> If you received this in error, please contact the sender and  
> delete the
> >> material from any computer.
> >>  
> ______________________________________________________________________ 
> _
> >> Index: src/java/org/apache/hadoop/mapred/TaskLog.java
> >> ===================================================================
> >> --- src/java/org/apache/hadoop/mapred/TaskLog.java (revision  
> 570771)
> >> +++ src/java/org/apache/hadoop/mapred/TaskLog.java (working copy)
> >> @@ -198,8 +198,12 @@
> >>                                                  File  
> stderrFilename,
> >>                                                  long tailLength
> >>                                                 ) throws  
> IOException {
> >> -    String stdout = FileUtil.makeShellPath(stdoutFilename);
> >> -    String stderr = FileUtil.makeShellPath(stderrFilename);
> >> +    /*
> >> +     PATCH NOTE
> >> +  Use absolute filename to get around problem where parent  
> process not in
> >> same dir as child
> >> +     */
> >> +    String stdout = FileUtil.makeShellPath 
> (stdoutFilename.getAbsoluteFile());
> >> +    String stderr = FileUtil.makeShellPath 
> (stderrFilename.getAbsoluteFile());
> >>      List<String> result = new ArrayList<String>(3);
> >>      result.add(bashCommand);
> >>      result.add("-c");
> >> Index: src/java/org/apache/hadoop/mapred/Task.java
> >> ===================================================================
> >> --- src/java/org/apache/hadoop/mapred/Task.java (revision 570771)
> >> +++ src/java/org/apache/hadoop/mapred/Task.java (working copy)
> >> @@ -40,6 +40,16 @@
> >>  import org.apache.hadoop.util.ReflectionUtils;
> >>  import org.apache.hadoop.util.StringUtils;
> >>
> >> +/*
> >> + PATCH NOTE
> >> + Imports to support patch below
> >> +*/
> >> +import java.io.File;
> >> +import java.net.URL;
> >> +import java.net.URLClassLoader;
> >> +import java.util.ArrayList;
> >> +import org.apache.hadoop.filecache.DistributedCache;
> >> +
> >>  /** Base class for tasks. */
> >>  abstract class Task implements Writable, Configurable {
> >>    private static final Log LOG =
> >> @@ -205,6 +215,86 @@
> >>      }
> >>    }
> >>
> >> +  /*
> >> +    PATCH NOTE
> >> +    This logic is basically a cut & paste of work done in
> >> org.apache.hadoop.mapred.TaskRunner
> >> + The reason for doing this here is to get around the issue  
> where the system
> >> classpath
> >> +    does not contain the Hadoop api classes (i.e. Reducer,  
> Mapper etc)
> >> instead these classes
> >> +    are found in osgi classpath loaded from parent classloader.
> >> +  */
> >> +  public void configureClasspath(JobConf conf)
> >> +  throws IOException {
> >> +
> >> +   // get the task and the current classloader which will  
> become the parent
> >> +   ClassLoader parent = conf.getClassLoader();
> >> +
> >> +   // get the work directory which holds the elements we are  
> dynamically
> >> +   // adding to the classpath
> >> +   File workDir = new File(getJobFile()).getParentFile();
> >> +   File jobCacheDir = new File(workDir.getParent(), "work");
> >> +   ArrayList<URL> urllist = new ArrayList<URL>();
> >> +
> >> +   // add the jars and directories to the classpath
> >> +   String jar = conf.getJar();
> >> +   if (jar != null) {
> >> +    File[] libs = new File(jobCacheDir, "lib").listFiles();
> >> +    if (libs != null) {
> >> +     for (int i = 0; i < libs.length; i++) {
> >> +      urllist.add(libs[i].toURL());
> >> +     }
> >> +    }
> >> +    urllist.add(new File(jobCacheDir, "classes").toURL());
> >> +    urllist.add(jobCacheDir.toURL());
> >> +
> >> +   }
> >> +
> >> +      URI[] archives = DistributedCache.getCacheArchives(conf);
> >> +      URI[] files = DistributedCache.getCacheFiles(conf);
> >> +      // include the user specified classpath
> >> +
> >> +      //archive paths
> >> +      Path[] archiveClasspaths =  
> DistributedCache.getArchiveClassPaths(conf);
> >> +      if (archiveClasspaths != null && archives != null) {
> >> +        Path[] localArchives = DistributedCache
> >> +          .getLocalCacheArchives(conf);
> >> +        if (localArchives != null){
> >> +          for (int i=0;i<archives.length;i++){
> >> +            for(int j=0;j<archiveClasspaths.length;j++){
> >> +              if (archives[i].getPath().equals(
> >> +
> >> archiveClasspaths[j].toString())){
> >> +                urllist.add(localArchives[i].toUri().toURL());
> >> +              }
> >> +            }
> >> +          }
> >> +        }
> >> +      }
> >> +      //file paths
> >> +      Path[] fileClasspaths = DistributedCache.getFileClassPaths 
> (conf);
> >> +      if (fileClasspaths!=null && files != null) {
> >> +        Path[] localFiles = DistributedCache
> >> +          .getLocalCacheFiles(conf);
> >> +        if (localFiles != null) {
> >> +          for (int i = 0; i < files.length; i++) {
> >> +            for (int j = 0; j < fileClasspaths.length; j++) {
> >> +              if (files[i].getPath().equals(
> >> +                                            fileClasspaths 
> [j].toString())) {
> >> +                  urllist.add(localFiles[i].toUri().toURL());
> >> +              }
> >> +            }
> >> +          }
> >> +        }
> >> +      }
> >> +
> >> +   urllist.add(workDir.toURL());
> >> +
> >> +   // create a new classloader with the old classloader as its  
> parent
> >> +   // then set that classloader as the one used by the current  
> jobconf
> >> +   URL[] urls = urllist.toArray(new URL[urllist.size()]);
> >> +   URLClassLoader loader = new URLClassLoader(urls, parent);
> >> +   conf.setClassLoader(loader);
> >> +  }
> >> +
> >> +
> >>    /** Run this task as a part of the named job.  This method is  
> executed in
> >> the
> >>     * child process and is what invokes user-supplied map,  
> reduce, etc.
> >> methods.
> >>     * @param umbilical for progress reports
> >> Index: src/java/org/apache/hadoop/mapred/TaskRunner.java
> >> ===================================================================
> >> --- src/java/org/apache/hadoop/mapred/TaskRunner.java (revision  
> 570771)
> >> +++ src/java/org/apache/hadoop/mapred/TaskRunner.java (working  
> copy)
> >> @@ -24,6 +24,7 @@
> >>  import org.apache.hadoop.util.*;
> >>  import java.io.*;
> >>  import java.util.List;
> >> +import java.util.StringTokenizer;
> >>  import java.util.Vector;
> >>  import java.net.URI;
> >>
> >> @@ -144,69 +145,32 @@
> >>        String sep = System.getProperty("path.separator");
> >>        StringBuffer classPath = new StringBuffer();
> >>        // start with same classpath as parent process
> >> -      classPath.append(System.getProperty("java.class.path"));
> >> +      /*
> >> +       PATCH NOTE
> >> +    qualify path to get around problem where parent process not  
> in same dir
> >> as child
> >> +       */
> >> +      classPath.append(qualifyPath(System.getProperty 
> ("java.class.path")));
> >>        classPath.append(sep);
> >>        if (!workDir.mkdirs()) {
> >>          if (!workDir.isDirectory()) {
> >>            LOG.fatal("Mkdirs failed to create " +  
> workDir.toString());
> >>          }
> >>        }
> >> -
> >> -      String jar = conf.getJar();
> >> -      if (jar != null) {
> >> -        // if jar exists, it into workDir
> >> -        File[] libs = new File(jobCacheDir, "lib").listFiles();
> >> -        if (libs != null) {
> >> -          for (int i = 0; i < libs.length; i++) {
> >> -            classPath.append(sep);            // add libs from  
> jar to
> >> classpath
> >> -            classPath.append(libs[i]);
> >> -          }
> >> -        }
> >> -        classPath.append(sep);
> >> -        classPath.append(new File(jobCacheDir, "classes"));
> >> -        classPath.append(sep);
> >> -        classPath.append(jobCacheDir);
> >> -
> >> +      /*
> >> +       PATCH NOTE
> >> +    Allow extra jars on classpath not necessarily those found  
> on system
> >> classpath
> >> +       (in osgi classpath is dynamically built by installed  
> bundles vs
> >> statically configured
> >> +       at system boot)
> >> +       */
> >> +      String jobPath = conf.get( "mapred.child.java.classpath" );
> >> +      if ( jobPath != null ) {
> >> +       classPath.append( jobPath );
> >> +       classPath.append( sep );
> >>        }
> >> -
> >> -      // include the user specified classpath
> >> -
> >> -      //archive paths
> >> -      Path[] archiveClasspaths =  
> DistributedCache.getArchiveClassPaths(conf);
> >> -      if (archiveClasspaths != null && archives != null) {
> >> -        Path[] localArchives = DistributedCache
> >> -          .getLocalCacheArchives(conf);
> >> -        if (localArchives != null){
> >> -          for (int i=0;i<archives.length;i++){
> >> -            for(int j=0;j<archiveClasspaths.length;j++){
> >> -              if (archives[i].getPath().equals(
> >> -
> >> archiveClasspaths[j].toString())){
> >> -                classPath.append(sep);
> >> -                classPath.append(localArchives[i]
> >> -                                 .toString());
> >> -              }
> >> -            }
> >> -          }
> >> -        }
> >> -      }
> >> -      //file paths
> >> -      Path[] fileClasspaths = DistributedCache.getFileClassPaths 
> (conf);
> >> -      if (fileClasspaths!=null && files != null) {
> >> -        Path[] localFiles = DistributedCache
> >> -          .getLocalCacheFiles(conf);
> >> -        if (localFiles != null) {
> >> -          for (int i = 0; i < files.length; i++) {
> >> -            for (int j = 0; j < fileClasspaths.length; j++) {
> >> -              if (files[i].getPath().equals(
> >> -                                            fileClasspaths 
> [j].toString())) {
> >> -                classPath.append(sep);
> >> -                classPath.append(localFiles[i].toString());
> >> -              }
> >> -            }
> >> -          }
> >> -        }
> >> -      }
> >> -
> >> +   /*
> >> +       PATCH NOTE
> >> +    Logic moved to Task.configureClasspath - see patch comment  
> there
> >> +       */
> >>        classPath.append(sep);
> >>        classPath.append(workDir);
> >>        //  Build exec child jmv args.
> >> @@ -275,7 +239,16 @@
> >>          vargs.add("-Dhadoop.tasklog.taskid=" + taskid);
> >>          vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);
> >>
> >> -        // Add main class and its arguments
> >> +        // Add main class and its arguments
> >> +        /*
> >> +         PATCH NOTE
> >> +      Allow client to specify a launcher that launches your  
> main class
> >> +         This allows me to substitute an osgi enabled jvm
> >> +         */
> >> +        String launcher = conf.get( "mapred.child.launcher" );
> >> +        if ( launcher != null ) {
> >> +         vargs.add( launcher );
> >> +        }
> >>          vargs.add(TaskTracker.Child.class.getName());  // main  
> of Child
> >>          // pass umbilical port
> >>          vargs.add(Integer.toString 
> (tracker.getTaskTrackerReportPort()));
> >> @@ -328,7 +301,41 @@
> >>    }
> >>
> >>
> >> + /*
> >> +  PATCH NOTE
> >> +  Utility method to qualify system classpath - could be more  
> efficient,
> >> (StringTokenizer bleugh)
> >> +  */
> >>    /**
> >> + * @param property
> >> + * @return
> >> + */
> >> + private String qualifyPath( String property ) {
> >> + StringBuffer buf = new StringBuffer(property.length() * 10);
> >> + String sep = System.getProperty("path.separator");
> >> +
> >> + StringTokenizer tok = new StringTokenizer(property, sep);
> >> +
> >> + while( tok.hasMoreTokens() ) {
> >> +  String element = tok.nextToken();
> >> +
> >> +  if ( element.trim().length() > 0 ) {
> >> +   File f = new File( element );
> >> +
> >> +   if ( f.exists() ) {
> >> +    buf.append( f.getAbsolutePath() );
> >> +   }
> >> +   else {
> >> +    throw new IllegalStateException( "Unknown path element " +  
> tok );
> >> +   }
> >> +
> >> +   buf.append( sep );
> >> +  }
> >> + }
> >> +
> >> + return buf.toString();
> >> +}
> >> +
> >> +/**
> >>     * Handle deprecated mapred.child.heap.size.
> >>     * If present, interpolate into mapred.child.java.opts value  
> with
> >>     * warning.
> >> @@ -399,6 +406,11 @@
> >>        int exit_code = process.waitFor();
> >>
> >>        if (!killed && exit_code != 0) {
> >> +          /*
> >> +           PATCH NOTE
> >> +           Debug to help diagnose problems in launching process
> >> +           */
> >> +          LOG.warn( "Failed to execute : " + argsToString 
> ( args ) );
> >>          throw new IOException("Task process exit with nonzero  
> status of " +
> >>                                exit_code + ".");
> >>        }
> >> @@ -408,6 +420,24 @@
> >>        kill();
> >>      }
> >>    }
> >> +
> >> +  /*
> >> +   PATCH NOTE
> >> +   Utility method used in logging
> >> +  */
> >> +  private String argsToString( List<String> args ) {
> >> +      StringBuffer buf = new StringBuffer();
> >> +
> >> +      for ( String arg : args ) {
> >> +          if ( buf.length() != 0 ) {
> >> +              buf.append( ' ' );
> >> +          }
> >> +
> >> +          buf.append( arg );
> >> +      }
> >> +
> >> +      return buf.toString();
> >> +  }
> >>
> >>    /**
> >>     * Kill the child process
> >> Index: src/java/org/apache/hadoop/mapred/TaskTracker.java
> >> ===================================================================
> >> --- src/java/org/apache/hadoop/mapred/TaskTracker.java (revision  
> 570771)
> >> +++ src/java/org/apache/hadoop/mapred/TaskTracker.java (working  
> copy)
> >> @@ -21,6 +21,7 @@
> >>  import java.io.ByteArrayInputStream;
> >>  import java.io.ByteArrayOutputStream;
> >>  import java.io.File;
> >> +import java.io.FileInputStream;
> >>  import java.io.FilterOutputStream;
> >>  import java.io.IOException;
> >>  import java.io.InputStream;
> >> @@ -28,6 +29,8 @@
> >>  import java.io.PrintStream;
> >>  import java.net.BindException;
> >>  import java.net.InetSocketAddress;
> >> +import java.net.URL;
> >> +import java.net.URLClassLoader;
> >>  import java.util.ArrayList;
> >>  import java.util.HashMap;
> >>  import java.util.HashSet;
> >> @@ -641,7 +644,20 @@
> >>          rjob.localized = true;
> >>        }
> >>      }
> >> -    launchTaskForJob(tip, new JobConf(rjob.jobFile));
> >> +
> >> +    /*
> >> +     PATCH NOTE
> >> +  Allow parent proccess to copy config to child job to enable
> >> +     specification of child launcher and classpath in TaskRunner
> >> +     */
> >> +    JobConf conf = new JobConf(rjob.jobFile);
> >> +    for ( Map.Entry<String, String> entry : fConf ) {
> >> +        if ( entry.getKey().startsWith( "mapred.child" ) ) {
> >> +            conf.set( entry.getKey(), entry.getValue() );
> >> +        }
> >> +    }
> >> +
> >> +    launchTaskForJob(tip, conf);
> >>    }
> >>
> >>    private void launchTaskForJob(TaskInProgress tip, JobConf  
> jobConf) throws
> >> IOException{
> >> Index: src/java/org/apache/hadoop/mapred/ReduceTask.java
> >> ===================================================================
> >> --- src/java/org/apache/hadoop/mapred/ReduceTask.java (revision  
> 570771)
> >> +++ src/java/org/apache/hadoop/mapred/ReduceTask.java (working  
> copy)
> >> @@ -237,6 +237,11 @@
> >>
> >>    public void run(JobConf job, final TaskUmbilicalProtocol  
> umbilical)
> >>      throws IOException {
> >> + /*
> >> +      PATCH NOTE
> >> +      See comment in org.apache.hadoop.mapred.Task
> >> +    */
> >> + configureClasspath(job);
> >>      Reducer reducer = (Reducer)ReflectionUtils.newInstance(
> >>
> >> job.getReducerClass(), job);
> >>
> >> @@ -735,46 +740,14 @@
> >>        }
> >>
> >>      }
> >> -
> >> -    private void configureClasspath(JobConf conf)
> >> -      throws IOException {
> >> -
> >> -      // get the task and the current classloader which will  
> become the
> >> parent
> >> -      Task task = ReduceTask.this;
> >> -      ClassLoader parent = conf.getClassLoader();
> >> -
> >> -      // get the work directory which holds the elements we are  
> dynamically
> >> -      // adding to the classpath
> >> -      File workDir = new File(task.getJobFile()).getParentFile();
> >> -      File jobCacheDir = new File(workDir.getParent(), "work");
> >> -      ArrayList<URL> urllist = new ArrayList<URL>();
> >> -
> >> -      // add the jars and directories to the classpath
> >> -      String jar = conf.getJar();
> >> -      if (jar != null) {
> >> -        File[] libs = new File(jobCacheDir, "lib").listFiles();
> >> -        if (libs != null) {
> >> -          for (int i = 0; i < libs.length; i++) {
> >> -            urllist.add(libs[i].toURL());
> >> -          }
> >> -        }
> >> -        urllist.add(new File(jobCacheDir, "classes").toURL());
> >> -        urllist.add(jobCacheDir.toURL());
> >> +    /*
> >> +     PATCH NOTE
> >> +  Move configureClasspath to parent class
> >> +     */
> >>
> >> -      }
> >> -      urllist.add(workDir.toURL());
> >> -
> >> -      // create a new classloader with the old classloader as  
> its parent
> >> -      // then set that classloader as the one used by the  
> current jobconf
> >> -      URL[] urls = urllist.toArray(new URL[urllist.size()]);
> >> -      URLClassLoader loader = new URLClassLoader(urls, parent);
> >> -      conf.setClassLoader(loader);
> >> -    }
> >> -
> >>      public ReduceCopier(TaskUmbilicalProtocol umbilical,  
> JobConf conf)
> >>        throws IOException {
> >>
> >> -      configureClasspath(conf);
> >>        this.shuffleClientMetrics = new ShuffleClientMetrics(conf);
> >>        this.umbilical = umbilical;
> >>        this.reduceTask = ReduceTask.this;
> >> Index: src/java/org/apache/hadoop/mapred/MapTask.java
> >> ===================================================================
> >> --- src/java/org/apache/hadoop/mapred/MapTask.java (revision  
> 570771)
> >> +++ src/java/org/apache/hadoop/mapred/MapTask.java (working copy)
> >> @@ -115,11 +115,18 @@
> >>    public void run(final JobConf job, final  
> TaskUmbilicalProtocol umbilical)
> >>      throws IOException {
> >>
> >> + /*
> >> +      PATCH NOTE
> >> +      See comment in org.apache.hadoop.mapred.Task
> >> +    */
> >> +    configureClasspath(job);
> >> +
> >>      final Reporter reporter = getReporter(umbilical);
> >>
> >>      // start thread that will handle communication with parent
> >>      startCommunicationThread(umbilical);
> >>
> >> +
> >>      int numReduceTasks = conf.getNumReduceTasks();
> >>      LOG.info("numReduceTasks: " + numReduceTasks);
> >>      MapOutputCollector collector = null;
> >
> >
> >  
> ______________________________________________________________________ 
> _
> > This email has been scanned by both Message Labs and Paremus  
> internal systems for viruses and inappropriate attachments.  Email  
> suspected of carrying a virus payload or inappropriate attachment  
> will not be delivered to you, and the sender will receive an  
> appropriate warning notice.
> >  
> ______________________________________________________________________ 
> _
>
>
> ______________________________________________________________________ 
> _
> Paremus Limited. Registered in England
>  No. 4181472
>  Registered Office: St Alphage House, 2 Fore Street, London, EC2Y  
> 5DH Postal Address: 107-111 Fleet Street, London, EC4A 2AB
> The information transmitted is intended only for the person(s) or  
> entity to which it is addressed and may contain confidential and/or  
> privileged material. Any review, retransmission, dissemination or  
> other use of, or taking of any action in reliance upon, this  
> information by persons or entities other than the intended  
> recipient is prohibited.
> If you received this in error, please contact the sender and delete  
> the material from any computer.
> ______________________________________________________________________ 
> _
>


Mime
View raw message