Return-Path: Delivered-To: apmail-hadoop-mapreduce-dev-archive@minotaur.apache.org Received: (qmail 60945 invoked from network); 11 Oct 2010 17:21:55 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 11 Oct 2010 17:21:55 -0000 Received: (qmail 14019 invoked by uid 500); 11 Oct 2010 17:21:55 -0000 Delivered-To: apmail-hadoop-mapreduce-dev-archive@hadoop.apache.org Received: (qmail 13774 invoked by uid 500); 11 Oct 2010 17:21:54 -0000 Mailing-List: contact mapreduce-dev-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-dev@hadoop.apache.org Received: (qmail 13764 invoked by uid 99); 11 Oct 2010 17:21:54 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 11 Oct 2010 17:21:54 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.22] (HELO thor.apache.org) (140.211.11.22) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 11 Oct 2010 17:21:53 +0000 Received: from thor (localhost [127.0.0.1]) by thor.apache.org (8.13.8+Sun/8.13.8) with ESMTP id o9BHLXvg004122 for ; Mon, 11 Oct 2010 17:21:33 GMT Message-ID: <13270684.79831286817693021.JavaMail.jira@thor> Date: Mon, 11 Oct 2010 13:21:33 -0400 (EDT) From: "Randy Wilson (JIRA)" To: mapreduce-dev@hadoop.apache.org Subject: [jira] Created: (MAPREDUCE-2123) Multiple threads per JVM MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 Multiple threads per JVM ------------------------ Key: MAPREDUCE-2123 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2123 Project: Hadoop Map/Reduce Issue Type: Improvement Reporter: Randy Wilson I have a process that standardizes name and place strings, and requires access to java objects that require a lot of RAM (800MB). Hadoop (via Amazon elastic mapreduce) was running out of memory, because it was firing up one JVM per task per slave. Each JVM needed 1.5GB, and 6 of those blew out memory. In this case, we don't need 6 different JVMs running--we only need one, but with multiple threads. I tried using a MultithreadedMapper, but it doesn't have a thread-safe "run()" method: it makes 3 calls to the input source to read one "line", which doesn't work if multiple threads are doing that. So I had to override the run() method. I ended up having to do so much work to override the run() method that it was simpler to skip using the MultithreadedMapper at all. Instead, I took my original mapper and just overrode the run() method there directly. I fired up n threads, each of which called a method that had a synchronized(mutex) around the part of the process that made the three calls to an input source to get the next line to operate on. Then, outside of the synchronized block, it called the map() method, which made use of the large, shared (singleton) standardization object. All of this made me wonder why hadoop fires up multiple JVMs per slave in the first place--that is a lot of overhead to use per thread. I've also been warned that doing continual reuse of JVMs instead of restarting one per task will use up more memory. That seems like it should only be true if hadoop (or our mapper) is leaking memory. If that's the case, let's get it fixed. My guess is that since hadoop can run tasks in languages other than Java--and since other languages may have less overhead per process--that firing up a JVM per task (or per thread) simplifies hadoop. But the multithreaded solution we did was very general-purpose. It seems like it ought to be pretty much the default solution in java, and that a "...map.threads" property should be all that is required to fire up that many threads to help with each task, rather than have to jump through the hoops we had to. Below is the implementation that seems to be working: In the main class: Configuration config = getConf(); config.set("num_threads_per_jvm", Integer.toString(numThreads)); Job job = new Job(config, "Standardize stuff"); In the Mapper class: public void run(final Context context) throws IOException, InterruptedException { int numThreads = Integer.parseInt(context.getConfiguration().get("num_threads_per_jvm"); setup(context); // setup and cleanup just once, rather than once per thread List mapRunners = new ArrayList(); for (int i = 0; i < numThreads; i++) { MapRunner mapRunner = new MapRunner(context, i); mapRunners.add(mapRunner); mapRunner.start(); } // Wait for all the threads to complete for (MapRunner mapRunner : mapRunners) { mapRunner.join(); } cleanup(context); } private class MapRunner extends Thread { final Context context; private MapRunner(Context context) { this.context = context; } @Override public void run() { boolean gotValue = true; do { try { Text key = null; Text value = null; synchronized(contextMutex) { gotValue = context.nextKeyValue(); if (gotValue) { key = context.getCurrentKey(); value = context.getCurrentValue(); } } if (gotValue) { map(key, value, context); } } catch (Exception e) { throw new RuntimeException(e); } } while (gotValue); } } -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.