hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kris Nuttycombe <kris.nuttyco...@gmail.com>
Subject Re: Reflective instantiation of Mappers and Reducers
Date Fri, 02 Apr 2010 22:35:38 GMT
Or heck... I could just base-64 encode the serialized byte arrays and
pass them as strings in the configuration. If it's going to be a hack,
might as well go all the way.

On Fri, Apr 2, 2010 at 4:10 PM, Kris Nuttycombe
<kris.nuttycombe@gmail.com> wrote:
> On Fri, Apr 2, 2010 at 3:10 PM, Owen O'Malley <omalley@apache.org> wrote:
>>
>> On Apr 2, 2010, at 12:05 PM, Kris Nuttycombe wrote:
>>
>>> What I'm wondering is, is there any way to simply serialize a Mapper
>>> or Reducer object, and have the serialized instance copied, passed
>>> around and used everywhere instead of always having the Mapper and
>>> Reducer instantiated by reflection? This would greatly simplify
>>> library design in my case.
>>
>> Currently the best you can do is to make your Mapper or Reduce implement
>> Configurable and use the values out of the configuration.
>>
>> Take a look at MAPREDUCE-1183. It should be exactly what you are asking for
>> when it gets implemented.
>>
>> -- Owen
>>
>
> Thanks for the reference to that ticket, Owen. In the meantime, I
> think I may have figured out a workaround. The following code
> (completely untested as of yet, but a starting point) provides base
> classes for an implementation based upon the distributed cache:
>
>
> import org.apache.hadoop.conf._
> import org.apache.hadoop.util._
> import org.apache.hadoop.mapreduce._
> import java.io._
> import SerializingResourceToolRunner._
>
> object SerializingResourceToolRunner {
>  val serializedResourceName = "socialmedia.mr_tool.serfile"
> }
>
> class SerializingResourceToolRunner[T <: Serializable](tool:
> SerializingResourceTool[T]) {
>  def runWithToolRunner(argv: Array[String]) = {
>    def stripFileArg(i: Int, l: List[String], f: Option[String]):
> (List[String], Option[String]) = {
>      if (i >= argv.length) (l, f)
>      else if (argv(i) == "-files") stripFileArg(i + 2, l, option(argv(i + 1)))
>      else stripFileArg(i + 1, argv(i) :: l, f)
>    }
>
>    val tempFile = File.createTempFile("mr_tool", ".ser")
>    using(new FileOutputStream(tempFile)) {
>      f => using(new ObjectOutputStream(f)) {
>        out => out.writeObject(tool.resource)
>      }
>    }
>
>    val (args, filesArg) = stripFileArg(0, Nil, None)
>
>    tool.getConf.set(serializedResourceName, tempFile.getName)
>    val filesArgWithTempFile = filesArg.map(_ + "," +
> tempFile.getAbsolutePath).getOrElse(tempFile.getAbsolutePath)
>    ToolRunner.run(tool, ("-files" :: filesArgWithTempFile :: args).toArray)
>  }
> }
>
> trait Resources[T] {
>  private var _resource: T = _
>  def resource: T = _resource
>
>  def init(conf: Configuration): Unit = {
>    _resource = using(new FileInputStream(new
> File(conf.get(serializedResourceName)))) {
>      f => using(new ObjectInputStream(f)) {
>        in => in.readObject.asInstanceOf[T]
>      }
>    }
>  }
> }
>
> abstract class SerializedResourceMapper[T, KI, VI, KO, VO] extends
> Mapper[KI, VI, KO, VO] with Resources[T] {
>  override def setup(context: Mapper[KI, VI, KO, VO]#Context): Unit = {
>    super.setup(context)
>    init(context.getConfiguration)
>  }
> }
>
> abstract class SerializedResourceReducer[T, KI, VI, KO, VO] extends
> Reducer[KI, VI, KO, VO] with Resources[T] {
>  override def setup(context: Reducer[KI, VI, KO, VO]#Context): Unit = {
>    super.setup(context)
>    init(context.getConfiguration)
>  }
> }
>
> abstract class SerializingResourceTool[T <: Serializable] extends
> Configured with Tool {
>  def resource: T
> }
>

Mime
View raw message