Return-Path: Delivered-To: apmail-lucene-hadoop-user-archive@locus.apache.org Received: (qmail 92018 invoked from network); 5 Sep 2007 16:05:46 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 5 Sep 2007 16:05:46 -0000 Received: (qmail 79997 invoked by uid 500); 5 Sep 2007 16:05:41 -0000 Delivered-To: apmail-lucene-hadoop-user-archive@lucene.apache.org Received: (qmail 79471 invoked by uid 500); 5 Sep 2007 16:05:39 -0000 Mailing-List: contact hadoop-user-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-user@lucene.apache.org Delivered-To: mailing list hadoop-user@lucene.apache.org Received: (qmail 79462 invoked by uid 99); 5 Sep 2007 16:05:39 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Sep 2007 09:05:39 -0700 X-ASF-Spam-Status: No, hits=-4.0 required=10.0 tests=RCVD_IN_DNSWL_MED,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: local policy) Received: from [62.231.131.195] (HELO mail35.messagelabs.com) (62.231.131.195) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 05 Sep 2007 16:06:50 +0000 X-VirusChecked: Checked X-Env-Sender: dave.savage@paremus.com X-Msg-Ref: server-11.tower-35.messagelabs.com!1189008305!9641213!1 X-StarScan-Version: 5.5.12.14.2; banners=paremus.com,-,- X-Originating-IP: [212.36.38.178] Received: (qmail 4919 invoked from network); 5 Sep 2007 16:05:05 -0000 Received: from unknown (HELO server6.paremus.com) (212.36.38.178) by server-11.tower-35.messagelabs.com with SMTP; 5 Sep 2007 16:05:05 -0000 Received: from hal.local (unknown [192.168.123.191]) (using TLSv1 with cipher DHE-RSA-AES256-SHA (256/256 bits)) (No client certificate requested) by server6.paremus.com (Postfix) with ESMTP id F2B0E193E9 for ; Wed, 5 Sep 2007 17:04:55 +0100 (BST) Message-ID: <46DED3A7.20001@paremus.com> Date: Wed, 05 Sep 2007 17:04:55 +0100 From: David Savage User-Agent: Thunderbird 2.0.0.6 (Macintosh/20070728) MIME-Version: 1.0 To: hadoop-user@lucene.apache.org Subject: Hadoop in an OSGi environment Content-Type: multipart/mixed; boundary="------------030502040302090306030800" X-Paremus-MailScanner-Information: Please contact Paremus for more information X-Paremus-MailScanner: Found to be clean X-Paremus-MailScanner-From: dave.savage@paremus.com X-Virus-Checked: Checked by ClamAV on apache.org X-Old-Spam-Status: No --------------030502040302090306030800 Content-Type: text/plain; charset=ISO-8859-1; format=flowed Content-Transfer-Encoding: 7bit Hi All, Just popping my head up over the lurkers parapet for a second to let you know about some development work I've been doing regarding getting Hadoop to run in an OSGi environment. Sorry if this is the wrong forum for this post. I did consider sending this to the hadoop-dev list but on inspection of the traffic there it seemed to be mostly Jira issue tracking of which this is not yet a part.... In order to get Hadoop to work in an OSGi environment I had to make a couple of changes to the Hadoop core, mostly with regard to better handling of classloading in the TaskTracker$Child process. Attached is a patch file of the changes I made which is compiled against the 0.14.0 svn tag - comments are included at each diff point to explain what I'm doing. Obviously really happy for these changes to be added to Hadoop or for you to suggest any other changes or improvements. Short background with some product placement - please skip if not in slightest bit interested... The background to this work was to get Hadoop running against our product Infiniflow of which there is an opensource component - Newton (http://newton.codecauldron.org). Newton is a distributed runtime based on OSGi which handles runtime deployment and failover of SCA composites over a compute fabric (our marketing term for bunch of hetrogenous servers). Final blurb... If anyone is able to make it I should hopefully be presenting some of this work at the NYJavaSIG on 17th October at a session entitled "OSGi In The Enterprise". Regards, Dave Savage Software Engineer Paremus Ltd http://www.paremus.com http://www.codecauldron.org _______________________________________________________________________ Paremus Limited. Registered in England No. 4181472 Registered Office: St Alphage House, 2 Fore Street, London, EC2Y 5DH Postal Address: 107-111 Fleet Street, London, EC4A 2AB The information transmitted is intended only for the person(s) or entity to which it is addressed and may contain confidential and/or privileged material. Any review, retransmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any computer. _______________________________________________________________________ --------------030502040302090306030800 Content-Type: text/plain; x-mac-type="0"; x-mac-creator="0"; name="hadoop-osgi.patch" Content-Transfer-Encoding: 7bit Content-Disposition: inline; filename="hadoop-osgi.patch" Index: src/java/org/apache/hadoop/mapred/TaskLog.java =================================================================== --- src/java/org/apache/hadoop/mapred/TaskLog.java (revision 570771) +++ src/java/org/apache/hadoop/mapred/TaskLog.java (working copy) @@ -198,8 +198,12 @@ File stderrFilename, long tailLength ) throws IOException { - String stdout = FileUtil.makeShellPath(stdoutFilename); - String stderr = FileUtil.makeShellPath(stderrFilename); + /* + PATCH NOTE + Use absolute filename to get around problem where parent process not in same dir as child + */ + String stdout = FileUtil.makeShellPath(stdoutFilename.getAbsoluteFile()); + String stderr = FileUtil.makeShellPath(stderrFilename.getAbsoluteFile()); List result = new ArrayList(3); result.add(bashCommand); result.add("-c"); Index: src/java/org/apache/hadoop/mapred/Task.java =================================================================== --- src/java/org/apache/hadoop/mapred/Task.java (revision 570771) +++ src/java/org/apache/hadoop/mapred/Task.java (working copy) @@ -40,6 +40,16 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; +/* + PATCH NOTE + Imports to support patch below +*/ +import java.io.File; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.ArrayList; +import org.apache.hadoop.filecache.DistributedCache; + /** Base class for tasks. */ abstract class Task implements Writable, Configurable { private static final Log LOG = @@ -205,6 +215,86 @@ } } + /* + PATCH NOTE + This logic is basically a cut & paste of work done in org.apache.hadoop.mapred.TaskRunner + The reason for doing this here is to get around the issue where the system classpath + does not contain the Hadoop api classes (i.e. Reducer, Mapper etc) instead these classes + are found in osgi classpath loaded from parent classloader. + */ + public void configureClasspath(JobConf conf) + throws IOException { + + // get the task and the current classloader which will become the parent + ClassLoader parent = conf.getClassLoader(); + + // get the work directory which holds the elements we are dynamically + // adding to the classpath + File workDir = new File(getJobFile()).getParentFile(); + File jobCacheDir = new File(workDir.getParent(), "work"); + ArrayList urllist = new ArrayList(); + + // add the jars and directories to the classpath + String jar = conf.getJar(); + if (jar != null) { + File[] libs = new File(jobCacheDir, "lib").listFiles(); + if (libs != null) { + for (int i = 0; i < libs.length; i++) { + urllist.add(libs[i].toURL()); + } + } + urllist.add(new File(jobCacheDir, "classes").toURL()); + urllist.add(jobCacheDir.toURL()); + + } + + URI[] archives = DistributedCache.getCacheArchives(conf); + URI[] files = DistributedCache.getCacheFiles(conf); + // include the user specified classpath + + //archive paths + Path[] archiveClasspaths = DistributedCache.getArchiveClassPaths(conf); + if (archiveClasspaths != null && archives != null) { + Path[] localArchives = DistributedCache + .getLocalCacheArchives(conf); + if (localArchives != null){ + for (int i=0;i 0 ) { + File f = new File( element ); + + if ( f.exists() ) { + buf.append( f.getAbsolutePath() ); + } + else { + throw new IllegalStateException( "Unknown path element " + tok ); + } + + buf.append( sep ); + } + } + + return buf.toString(); +} + +/** * Handle deprecated mapred.child.heap.size. * If present, interpolate into mapred.child.java.opts value with * warning. @@ -399,6 +406,11 @@ int exit_code = process.waitFor(); if (!killed && exit_code != 0) { + /* + PATCH NOTE + Debug to help diagnose problems in launching process + */ + LOG.warn( "Failed to execute : " + argsToString( args ) ); throw new IOException("Task process exit with nonzero status of " + exit_code + "."); } @@ -408,6 +420,24 @@ kill(); } } + + /* + PATCH NOTE + Utility method used in logging + */ + private String argsToString( List args ) { + StringBuffer buf = new StringBuffer(); + + for ( String arg : args ) { + if ( buf.length() != 0 ) { + buf.append( ' ' ); + } + + buf.append( arg ); + } + + return buf.toString(); + } /** * Kill the child process Index: src/java/org/apache/hadoop/mapred/TaskTracker.java =================================================================== --- src/java/org/apache/hadoop/mapred/TaskTracker.java (revision 570771) +++ src/java/org/apache/hadoop/mapred/TaskTracker.java (working copy) @@ -21,6 +21,7 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; +import java.io.FileInputStream; import java.io.FilterOutputStream; import java.io.IOException; import java.io.InputStream; @@ -28,6 +29,8 @@ import java.io.PrintStream; import java.net.BindException; import java.net.InetSocketAddress; +import java.net.URL; +import java.net.URLClassLoader; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -641,7 +644,20 @@ rjob.localized = true; } } - launchTaskForJob(tip, new JobConf(rjob.jobFile)); + + /* + PATCH NOTE + Allow parent proccess to copy config to child job to enable + specification of child launcher and classpath in TaskRunner + */ + JobConf conf = new JobConf(rjob.jobFile); + for ( Map.Entry entry : fConf ) { + if ( entry.getKey().startsWith( "mapred.child" ) ) { + conf.set( entry.getKey(), entry.getValue() ); + } + } + + launchTaskForJob(tip, conf); } private void launchTaskForJob(TaskInProgress tip, JobConf jobConf) throws IOException{ Index: src/java/org/apache/hadoop/mapred/ReduceTask.java =================================================================== --- src/java/org/apache/hadoop/mapred/ReduceTask.java (revision 570771) +++ src/java/org/apache/hadoop/mapred/ReduceTask.java (working copy) @@ -237,6 +237,11 @@ public void run(JobConf job, final TaskUmbilicalProtocol umbilical) throws IOException { + /* + PATCH NOTE + See comment in org.apache.hadoop.mapred.Task + */ + configureClasspath(job); Reducer reducer = (Reducer)ReflectionUtils.newInstance( job.getReducerClass(), job); @@ -735,46 +740,14 @@ } } - - private void configureClasspath(JobConf conf) - throws IOException { - - // get the task and the current classloader which will become the parent - Task task = ReduceTask.this; - ClassLoader parent = conf.getClassLoader(); - - // get the work directory which holds the elements we are dynamically - // adding to the classpath - File workDir = new File(task.getJobFile()).getParentFile(); - File jobCacheDir = new File(workDir.getParent(), "work"); - ArrayList urllist = new ArrayList(); - - // add the jars and directories to the classpath - String jar = conf.getJar(); - if (jar != null) { - File[] libs = new File(jobCacheDir, "lib").listFiles(); - if (libs != null) { - for (int i = 0; i < libs.length; i++) { - urllist.add(libs[i].toURL()); - } - } - urllist.add(new File(jobCacheDir, "classes").toURL()); - urllist.add(jobCacheDir.toURL()); + /* + PATCH NOTE + Move configureClasspath to parent class + */ - } - urllist.add(workDir.toURL()); - - // create a new classloader with the old classloader as its parent - // then set that classloader as the one used by the current jobconf - URL[] urls = urllist.toArray(new URL[urllist.size()]); - URLClassLoader loader = new URLClassLoader(urls, parent); - conf.setClassLoader(loader); - } - public ReduceCopier(TaskUmbilicalProtocol umbilical, JobConf conf) throws IOException { - configureClasspath(conf); this.shuffleClientMetrics = new ShuffleClientMetrics(conf); this.umbilical = umbilical; this.reduceTask = ReduceTask.this; Index: src/java/org/apache/hadoop/mapred/MapTask.java =================================================================== --- src/java/org/apache/hadoop/mapred/MapTask.java (revision 570771) +++ src/java/org/apache/hadoop/mapred/MapTask.java (working copy) @@ -115,11 +115,18 @@ public void run(final JobConf job, final TaskUmbilicalProtocol umbilical) throws IOException { + /* + PATCH NOTE + See comment in org.apache.hadoop.mapred.Task + */ + configureClasspath(job); + final Reporter reporter = getReporter(umbilical); // start thread that will handle communication with parent startCommunicationThread(umbilical); + int numReduceTasks = conf.getNumReduceTasks(); LOG.info("numReduceTasks: " + numReduceTasks); MapOutputCollector collector = null; --------------030502040302090306030800--