hadoop-common-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michael Bieniosek <mich...@powerset.com>
Subject Re: Hadoop in an OSGi environment
Date Wed, 05 Sep 2007 17:31:53 GMT
The "hadoop way" of submitting patches is to create a JIRA issue for each
patch so they can be tested and discussed separately.  It looks like you
have several unrelated changes in there.  You'll also need to regenerate
your patches against HEAD.

It's always nice to have more contributors.  I'm glad to hear you find
hadoop useful.

-Michael

On 9/5/07 9:04 AM, "David Savage" <dave.savage@paremus.com> wrote:

> Hi All,
> 
> Just popping my head up over the lurkers parapet for a second to let you
> know about some development work I've been doing regarding getting
> Hadoop to run in an OSGi environment.
> 
> Sorry if this is the wrong forum for this post. I did consider sending
> this to the hadoop-dev list but on inspection of the traffic there it
> seemed to be mostly Jira issue tracking of which this is not yet a part....
> 
> In order to get Hadoop to work in an OSGi environment I had to make a
> couple of changes to the Hadoop core, mostly with regard to better
> handling of classloading in the TaskTracker$Child process.
> 
> Attached is a patch file of the changes I made which is compiled against
> the 0.14.0 svn tag - comments are included at each diff point to explain
> what I'm doing. Obviously really happy for these changes to be added to
> Hadoop or for you to suggest any other changes or improvements.
> 
> Short background with some product placement - please skip if not in
> slightest bit interested...
> 
> The background to this work was to get Hadoop running against our
> product Infiniflow of which there is an opensource component - Newton
> (http://newton.codecauldron.org).
> 
> Newton is a distributed runtime based on OSGi which handles runtime
> deployment and failover of SCA composites over a compute fabric (our
> marketing term for bunch of hetrogenous servers).
> 
> Final blurb...
> 
> If anyone is able to make it I should hopefully be presenting some of
> this work at the NYJavaSIG on 17th October at a session entitled "OSGi
> In The Enterprise".
> 
> Regards,
> 
> Dave Savage
> Software Engineer
> Paremus Ltd
> http://www.paremus.com
> http://www.codecauldron.org
> 
> 
> _______________________________________________________________________
> Paremus Limited. Registered in England
>  No. 4181472
>  Registered Office: St Alphage House, 2 Fore Street, London, EC2Y 5DH Postal
> Address: 107-111 Fleet Street, London, EC4A 2AB
> The information transmitted is intended only for the person(s) or entity to
> which it is addressed and may contain confidential and/or privileged material.
> Any review, retransmission, dissemination or other use of, or taking of any
> action in reliance upon, this information by persons or entities other than
> the intended recipient is prohibited.
> If you received this in error, please contact the sender and delete the
> material from any computer.
> _______________________________________________________________________
> Index: src/java/org/apache/hadoop/mapred/TaskLog.java
> ===================================================================
> --- src/java/org/apache/hadoop/mapred/TaskLog.java (revision 570771)
> +++ src/java/org/apache/hadoop/mapred/TaskLog.java (working copy)
> @@ -198,8 +198,12 @@
>                                                  File stderrFilename,
>                                                  long tailLength
>                                                 ) throws IOException {
> -    String stdout = FileUtil.makeShellPath(stdoutFilename);
> -    String stderr = FileUtil.makeShellPath(stderrFilename);
> +    /* 
> +     PATCH NOTE
> +  Use absolute filename to get around problem where parent process not in
> same dir as child
> +     */
> +    String stdout = FileUtil.makeShellPath(stdoutFilename.getAbsoluteFile());
> +    String stderr = FileUtil.makeShellPath(stderrFilename.getAbsoluteFile());
>      List<String> result = new ArrayList<String>(3);
>      result.add(bashCommand);
>      result.add("-c");
> Index: src/java/org/apache/hadoop/mapred/Task.java
> ===================================================================
> --- src/java/org/apache/hadoop/mapred/Task.java (revision 570771)
> +++ src/java/org/apache/hadoop/mapred/Task.java (working copy)
> @@ -40,6 +40,16 @@
>  import org.apache.hadoop.util.ReflectionUtils;
>  import org.apache.hadoop.util.StringUtils;
>  
> +/* 
> + PATCH NOTE
> + Imports to support patch below
> +*/
> +import java.io.File;
> +import java.net.URL;
> +import java.net.URLClassLoader;
> +import java.util.ArrayList;
> +import org.apache.hadoop.filecache.DistributedCache;
> +
>  /** Base class for tasks. */
>  abstract class Task implements Writable, Configurable {
>    private static final Log LOG =
> @@ -205,6 +215,86 @@
>      }
>    }
>    
> +  /* 
> +    PATCH NOTE
> +    This logic is basically a cut & paste of work done in
> org.apache.hadoop.mapred.TaskRunner
> + The reason for doing this here is to get around the issue where the system
> classpath
> +    does not contain the Hadoop api classes (i.e. Reducer, Mapper etc)
> instead these classes
> +    are found in osgi classpath loaded from parent classloader.
> +  */
> +  public void configureClasspath(JobConf conf)
> +  throws IOException {
> +
> +   // get the task and the current classloader which will become the parent
> +   ClassLoader parent = conf.getClassLoader();
> +
> +   // get the work directory which holds the elements we are dynamically
> +   // adding to the classpath
> +   File workDir = new File(getJobFile()).getParentFile();
> +   File jobCacheDir = new File(workDir.getParent(), "work");
> +   ArrayList<URL> urllist = new ArrayList<URL>();
> +
> +   // add the jars and directories to the classpath
> +   String jar = conf.getJar();
> +   if (jar != null) {
> +    File[] libs = new File(jobCacheDir, "lib").listFiles();
> +    if (libs != null) {
> +     for (int i = 0; i < libs.length; i++) {
> +      urllist.add(libs[i].toURL());
> +     }
> +    }
> +    urllist.add(new File(jobCacheDir, "classes").toURL());
> +    urllist.add(jobCacheDir.toURL());
> +
> +   }
> +   
> +      URI[] archives = DistributedCache.getCacheArchives(conf);
> +      URI[] files = DistributedCache.getCacheFiles(conf);
> +      // include the user specified classpath
> +  
> +      //archive paths
> +      Path[] archiveClasspaths = DistributedCache.getArchiveClassPaths(conf);
> +      if (archiveClasspaths != null && archives != null) {
> +        Path[] localArchives = DistributedCache
> +          .getLocalCacheArchives(conf);
> +        if (localArchives != null){
> +          for (int i=0;i<archives.length;i++){
> +            for(int j=0;j<archiveClasspaths.length;j++){
> +              if (archives[i].getPath().equals(
> +                
> archiveClasspaths[j].toString())){
> +                urllist.add(localArchives[i].toUri().toURL());
> +              }
> +            }
> +          }
> +        }
> +      }
> +      //file paths
> +      Path[] fileClasspaths = DistributedCache.getFileClassPaths(conf);
> +      if (fileClasspaths!=null && files != null) {
> +        Path[] localFiles = DistributedCache
> +          .getLocalCacheFiles(conf);
> +        if (localFiles != null) {
> +          for (int i = 0; i < files.length; i++) {
> +            for (int j = 0; j < fileClasspaths.length; j++) {
> +              if (files[i].getPath().equals(
> +                                            fileClasspaths[j].toString())) {
> +                  urllist.add(localFiles[i].toUri().toURL());
> +              }
> +            }
> +          }
> +        }
> +      }
> +   
> +   urllist.add(workDir.toURL());
> +
> +   // create a new classloader with the old classloader as its parent
> +   // then set that classloader as the one used by the current jobconf
> +   URL[] urls = urllist.toArray(new URL[urllist.size()]);
> +   URLClassLoader loader = new URLClassLoader(urls, parent);
> +   conf.setClassLoader(loader);
> +  }
> +
> +  
>    /** Run this task as a part of the named job.  This method is executed in
> the
>     * child process and is what invokes user-supplied map, reduce, etc.
> methods.
>     * @param umbilical for progress reports
> Index: src/java/org/apache/hadoop/mapred/TaskRunner.java
> ===================================================================
> --- src/java/org/apache/hadoop/mapred/TaskRunner.java (revision 570771)
> +++ src/java/org/apache/hadoop/mapred/TaskRunner.java (working copy)
> @@ -24,6 +24,7 @@
>  import org.apache.hadoop.util.*;
>  import java.io.*;
>  import java.util.List;
> +import java.util.StringTokenizer;
>  import java.util.Vector;
>  import java.net.URI;
>  
> @@ -144,69 +145,32 @@
>        String sep = System.getProperty("path.separator");
>        StringBuffer classPath = new StringBuffer();
>        // start with same classpath as parent process
> -      classPath.append(System.getProperty("java.class.path"));
> +      /* 
> +       PATCH NOTE
> +    qualify path to get around problem where parent process not in same dir
> as child
> +       */
> +      classPath.append(qualifyPath(System.getProperty("java.class.path")));
>        classPath.append(sep);
>        if (!workDir.mkdirs()) {
>          if (!workDir.isDirectory()) {
>            LOG.fatal("Mkdirs failed to create " + workDir.toString());
>          }
>        }
> -   
> -      String jar = conf.getJar();
> -      if (jar != null) {
> -        // if jar exists, it into workDir
> -        File[] libs = new File(jobCacheDir, "lib").listFiles();
> -        if (libs != null) {
> -          for (int i = 0; i < libs.length; i++) {
> -            classPath.append(sep);            // add libs from jar to
> classpath
> -            classPath.append(libs[i]);
> -          }
> -        }
> -        classPath.append(sep);
> -        classPath.append(new File(jobCacheDir, "classes"));
> -        classPath.append(sep);
> -        classPath.append(jobCacheDir);
> -       
> +      /* 
> +       PATCH NOTE
> +    Allow extra jars on classpath not necessarily those found on system
> classpath
> +       (in osgi classpath is dynamically built by installed bundles vs
> statically configured
> +       at system boot)
> +       */      
> +      String jobPath = conf.get( "mapred.child.java.classpath" );
> +      if ( jobPath != null ) {
> +       classPath.append( jobPath );
> +       classPath.append( sep );
>        }
> -
> -      // include the user specified classpath
> -    
> -      //archive paths
> -      Path[] archiveClasspaths = DistributedCache.getArchiveClassPaths(conf);
> -      if (archiveClasspaths != null && archives != null) {
> -        Path[] localArchives = DistributedCache
> -          .getLocalCacheArchives(conf);
> -        if (localArchives != null){
> -          for (int i=0;i<archives.length;i++){
> -            for(int j=0;j<archiveClasspaths.length;j++){
> -              if (archives[i].getPath().equals(
> -                
> archiveClasspaths[j].toString())){
> -                classPath.append(sep);
> -                classPath.append(localArchives[i]
> -                                 .toString());
> -              }
> -            }
> -          }
> -        }
> -      }
> -      //file paths
> -      Path[] fileClasspaths = DistributedCache.getFileClassPaths(conf);
> -      if (fileClasspaths!=null && files != null) {
> -        Path[] localFiles = DistributedCache
> -          .getLocalCacheFiles(conf);
> -        if (localFiles != null) {
> -          for (int i = 0; i < files.length; i++) {
> -            for (int j = 0; j < fileClasspaths.length; j++) {
> -              if (files[i].getPath().equals(
> -                                            fileClasspaths[j].toString())) {
> -                classPath.append(sep);
> -                classPath.append(localFiles[i].toString());
> -              }
> -            }
> -          }
> -        }
> -      }
> -
> +   /* 
> +       PATCH NOTE
> +    Logic moved to Task.configureClasspath - see patch comment there
> +       */
>        classPath.append(sep);
>        classPath.append(workDir);
>        //  Build exec child jmv args.
> @@ -275,7 +239,16 @@
>          vargs.add("-Dhadoop.tasklog.taskid=" + taskid);
>          vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);
>  
> -        // Add main class and its arguments
> +        // Add main class and its arguments
> +        /* 
> +         PATCH NOTE
> +      Allow client to specify a launcher that launches your main class
> +         This allows me to substitute an osgi enabled jvm
> +         */
> +        String launcher = conf.get( "mapred.child.launcher" );
> +        if ( launcher != null ) {
> +         vargs.add( launcher );
> +        }
>          vargs.add(TaskTracker.Child.class.getName());  // main of Child
>          // pass umbilical port
>          vargs.add(Integer.toString(tracker.getTaskTrackerReportPort()));
> @@ -328,7 +301,41 @@
>    }
>  
>    
> + /* 
> +  PATCH NOTE
> +  Utility method to qualify system classpath - could be more efficient,
> (StringTokenizer bleugh)
> +  */
>    /**
> + * @param property
> + * @return
> + */
> + private String qualifyPath( String property ) {
> + StringBuffer buf = new StringBuffer(property.length() * 10);
> + String sep = System.getProperty("path.separator");
> + 
> + StringTokenizer tok = new StringTokenizer(property, sep);
> + 
> + while( tok.hasMoreTokens() ) {
> +  String element = tok.nextToken();
> + 
> +  if ( element.trim().length() > 0 ) {
> +   File f = new File( element );
> +   
> +   if ( f.exists() ) {
> +    buf.append( f.getAbsolutePath() );
> +   }
> +   else {
> +    throw new IllegalStateException( "Unknown path element " + tok );
> +   }
> +   
> +   buf.append( sep );
> +  }
> + }
> + 
> + return buf.toString();
> +}
> +
> +/**
>     * Handle deprecated mapred.child.heap.size.
>     * If present, interpolate into mapred.child.java.opts value with
>     * warning.
> @@ -399,6 +406,11 @@
>        int exit_code = process.waitFor();
>       
>        if (!killed && exit_code != 0) {
> +          /* 
> +           PATCH NOTE
> +           Debug to help diagnose problems in launching process
> +           */
> +          LOG.warn( "Failed to execute : " + argsToString( args ) );
>          throw new IOException("Task process exit with nonzero status of " +
>                                exit_code + ".");
>        }
> @@ -408,6 +420,24 @@
>        kill();   
>      }
>    }
> +  
> +  /* 
> +   PATCH NOTE
> +   Utility method used in logging
> +  */
> +  private String argsToString( List<String> args ) {
> +      StringBuffer buf = new StringBuffer();
> +      
> +      for ( String arg : args ) {
> +          if ( buf.length() != 0 ) {
> +              buf.append( ' ' );
> +          }
> +          
> +          buf.append( arg );
> +      }
> +      
> +      return buf.toString();
> +  }
>  
>    /**
>     * Kill the child process
> Index: src/java/org/apache/hadoop/mapred/TaskTracker.java
> ===================================================================
> --- src/java/org/apache/hadoop/mapred/TaskTracker.java (revision 570771)
> +++ src/java/org/apache/hadoop/mapred/TaskTracker.java (working copy)
> @@ -21,6 +21,7 @@
>  import java.io.ByteArrayInputStream;
>  import java.io.ByteArrayOutputStream;
>  import java.io.File;
> +import java.io.FileInputStream;
>  import java.io.FilterOutputStream;
>  import java.io.IOException;
>  import java.io.InputStream;
> @@ -28,6 +29,8 @@
>  import java.io.PrintStream;
>  import java.net.BindException;
>  import java.net.InetSocketAddress;
> +import java.net.URL;
> +import java.net.URLClassLoader;
>  import java.util.ArrayList;
>  import java.util.HashMap;
>  import java.util.HashSet;
> @@ -641,7 +644,20 @@
>          rjob.localized = true;
>        }
>      }
> -    launchTaskForJob(tip, new JobConf(rjob.jobFile));
> +    
> +    /* 
> +     PATCH NOTE
> +  Allow parent proccess to copy config to child job to enable
> +     specification of child launcher and classpath in TaskRunner
> +     */
> +    JobConf conf = new JobConf(rjob.jobFile);
> +    for ( Map.Entry<String, String> entry : fConf ) {
> +        if ( entry.getKey().startsWith( "mapred.child" ) ) {
> +            conf.set( entry.getKey(), entry.getValue() );
> +        }
> +    }
> +    
> +    launchTaskForJob(tip, conf);
>    }
>      
>    private void launchTaskForJob(TaskInProgress tip, JobConf jobConf) throws
> IOException{
> Index: src/java/org/apache/hadoop/mapred/ReduceTask.java
> ===================================================================
> --- src/java/org/apache/hadoop/mapred/ReduceTask.java (revision 570771)
> +++ src/java/org/apache/hadoop/mapred/ReduceTask.java (working copy)
> @@ -237,6 +237,11 @@
>  
>    public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
>      throws IOException {
> + /* 
> +      PATCH NOTE
> +      See comment in org.apache.hadoop.mapred.Task
> +    */
> + configureClasspath(job);
>      Reducer reducer = (Reducer)ReflectionUtils.newInstance(
>                  
> job.getReducerClass(), job);
>  
> @@ -735,46 +740,14 @@
>        }
>        
>      }
> -    
> -    private void configureClasspath(JobConf conf)
> -      throws IOException {
> -      
> -      // get the task and the current classloader which will become the
> parent
> -      Task task = ReduceTask.this;
> -      ClassLoader parent = conf.getClassLoader();
> -      
> -      // get the work directory which holds the elements we are dynamically
> -      // adding to the classpath
> -      File workDir = new File(task.getJobFile()).getParentFile();
> -      File jobCacheDir = new File(workDir.getParent(), "work");
> -      ArrayList<URL> urllist = new ArrayList<URL>();
> -      
> -      // add the jars and directories to the classpath
> -      String jar = conf.getJar();
> -      if (jar != null) {
> -        File[] libs = new File(jobCacheDir, "lib").listFiles();
> -        if (libs != null) {
> -          for (int i = 0; i < libs.length; i++) {
> -            urllist.add(libs[i].toURL());
> -          }
> -        }
> -        urllist.add(new File(jobCacheDir, "classes").toURL());
> -        urllist.add(jobCacheDir.toURL());
> +    /* 
> +     PATCH NOTE
> +  Move configureClasspath to parent class
> +     */    
>          
> -      }
> -      urllist.add(workDir.toURL());
> -      
> -      // create a new classloader with the old classloader as its parent
> -      // then set that classloader as the one used by the current jobconf
> -      URL[] urls = urllist.toArray(new URL[urllist.size()]);
> -      URLClassLoader loader = new URLClassLoader(urls, parent);
> -      conf.setClassLoader(loader);
> -    }
> -    
>      public ReduceCopier(TaskUmbilicalProtocol umbilical, JobConf conf)
>        throws IOException {
>        
> -      configureClasspath(conf);
>        this.shuffleClientMetrics = new ShuffleClientMetrics(conf);
>        this.umbilical = umbilical;
>        this.reduceTask = ReduceTask.this;
> Index: src/java/org/apache/hadoop/mapred/MapTask.java
> ===================================================================
> --- src/java/org/apache/hadoop/mapred/MapTask.java (revision 570771)
> +++ src/java/org/apache/hadoop/mapred/MapTask.java (working copy)
> @@ -115,11 +115,18 @@
>    public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
>      throws IOException {
>  
> + /* 
> +      PATCH NOTE
> +      See comment in org.apache.hadoop.mapred.Task
> +    */
> +    configureClasspath(job);
> +    
>      final Reporter reporter = getReporter(umbilical);
>  
>      // start thread that will handle communication with parent
>      startCommunicationThread(umbilical);
>  
> +    
>      int numReduceTasks = conf.getNumReduceTasks();
>      LOG.info("numReduceTasks: " + numReduceTasks);
>      MapOutputCollector collector = null;


Mime
View raw message