hadoop-mapreduce-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Randy Wilson (JIRA)" <j...@apache.org>
Subject [jira] Created: (MAPREDUCE-2123) Multiple threads per JVM
Date Mon, 11 Oct 2010 17:21:33 GMT
Multiple threads per JVM
------------------------

                 Key: MAPREDUCE-2123
                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2123
             Project: Hadoop Map/Reduce
          Issue Type: Improvement
            Reporter: Randy Wilson


I have a process that standardizes name and place strings, and requires access to java objects
that require a lot of RAM (800MB).  Hadoop (via Amazon elastic mapreduce) was running out
of memory, because it was firing up one JVM per task per slave.  Each JVM needed 1.5GB, and
6 of those blew out memory.

In this case, we don't need 6 different JVMs running--we only need one, but with multiple
threads.  I tried using a MultithreadedMapper, but it doesn't have a thread-safe "run()" method:
it makes 3 calls to the input source to read one "line", which doesn't work if multiple threads
are doing that.  So I had to override the run() method.  I ended up having to do so much work
to override the run() method that it was simpler to skip using the MultithreadedMapper at
all.  Instead, I took my original mapper and just overrode the run() method there directly.
 I fired up n threads, each of which called a method that had a synchronized(mutex) around
the part of the process that made the three calls to an input source to get the next line
to operate on.  Then, outside of the synchronized block, it called the map() method, which
made use of the large, shared (singleton) standardization object.

All of this made me wonder why hadoop fires up multiple JVMs per slave in the first place--that
is a lot of overhead to use per thread.   I've also been warned that doing continual reuse
of JVMs instead of restarting one per task will use up more memory.  That seems like it should
only be true if hadoop (or our mapper) is leaking memory.  If that's the case, let's get it
fixed.

My guess is that since hadoop can run tasks in languages other than Java--and since other
languages may have less overhead per process--that firing up a JVM per task (or per thread)
simplifies hadoop.  But the multithreaded solution we did was very general-purpose.  It seems
like it ought to be pretty much the default solution in java, and that a "...map.threads"
property should be all that is required to fire up that many threads to help with each task,
rather than have to jump through the hoops we had to.

Below is the implementation that seems to be working:

In the main class:
    Configuration config = getConf();
    config.set("num_threads_per_jvm", Integer.toString(numThreads));
    Job job = new Job(config, "Standardize stuff");

In the Mapper class:
  public void run(final Context context) throws IOException, InterruptedException {
    int numThreads = Integer.parseInt(context.getConfiguration().get("num_threads_per_jvm");
    setup(context); // setup and cleanup just once, rather than once per thread
    List<MapRunner> mapRunners = new ArrayList<MapRunner>();
    for (int i = 0; i < numThreads; i++) {
      MapRunner mapRunner = new MapRunner(context, i);
      mapRunners.add(mapRunner);
      mapRunner.start();
    }
     // Wait for all the threads to complete
    for (MapRunner mapRunner : mapRunners) {
      mapRunner.join();
    }
    cleanup(context);
  }

  private class MapRunner extends Thread {
    final Context context;

    private MapRunner(Context context) {
      this.context = context;
    }

    @Override
    public void run() {
      boolean gotValue = true;
      do {
        try {
          Text key = null;
          Text value = null;
          synchronized(contextMutex) {
            gotValue = context.nextKeyValue();
            if (gotValue) {
              key = context.getCurrentKey();
              value = context.getCurrentValue();
            }
          }
          if (gotValue) {
            map(key, value, context);
          }
        } catch (Exception e) {
          throw new RuntimeException(e);
        }
      } while (gotValue);
    }
  }

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


Mime
View raw message