hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kris Nuttycombe <kris.nuttyco...@gmail.com>
Subject Re: Passing objects to Mapper
Date Tue, 20 Apr 2010 18:37:48 GMT
Having been down this road recently, the best results I've been able
to obtain is to use the Distributed Cache and Java serialization; if
your LexicalizedParser is not serializable then you'll have to jump
through more hoops, but the basic approach can be the same.

Here's some Scala code that I use to set things up and retrieve them
properly through the Distributed Cache:

package socialmedia.common.hadoop

import socialmedia.common.util.Util._
import org.apache.hadoop.conf._
import org.apache.hadoop.filecache._
import org.apache.hadoop.fs._
import org.apache.hadoop.io._
import java.io.ObjectInputStream
import java.io.ObjectOutputStream
import java.io.Serializable
import java.util.UUID
import net.lag.logging.Logger
import DistCacheResources._

object DistCacheResources {
  private val log = Logger.get
  val serializedResourceDir = "dist_cache_resource.serfile"
  val serializedResourceUUID = "dist_cache_resource.uuid"

  def addToDistCache[T <: Serializable](conf: Configuration, uuid:
UUID, resource: T): Unit = {
    using(FileSystem.get(conf)) { fs =>
      val path = new Path(fs.getHomeDirectory, new
Path(serializedResourceDir, uuid.toString + ".ser"))
      log.info("Writing serialized resource to " + path)
      using(fs.create(path)) {
        out => using(new ObjectOutputStream(out)) {
          oout => oout.writeObject(resource)
        }
      }

      conf.set(serializedResourceUUID, uuid.toString)
      DistributedCache.addCacheFile(path.toUri, conf)
    }
  }
}

trait DistCacheResources[T] extends Configurable {
  private var _resource: Option[T] = _
  private var _conf: Configuration = _

  def resource: Option[T] = _resource

  override def setConf(conf: Configuration) = _conf = conf
  override def getConf = _conf

  def init(conf: Configuration): Unit = {
    if (conf == null) error("Configuration was null.")
    val uuid = UUID.fromString(conf.get(serializedResourceUUID))
    log.info("Will attempt to obtain serialized resource for uuid: " + uuid)
    val cacheFiles = DistributedCache.getLocalCacheFiles(conf)
    log.info("Available cache files: " + cacheFiles.mkString)
    _resource = cacheFiles.find(_.getName == uuid.toString + ".ser").map {
      resPath => using(FileSystem.getLocal(conf)) {
        fs => using(fs.open(resPath)) {
          in => using(new ObjectInputStream(in)) {
            oin => oin.readObject.asInstanceOf[T]
          }
        }
      }
    }
  }
}

Then, you can have something like :

class MyMapper extends Mapper[BW, BW, Text, BW] with
DistCacheResources[LexicalizedParser] {
  override protected def setup(context: HMapper[BW, BW, Text,
BW]#Context): Unit = {
    init(getConf)
  }

  // ...
}

And, in your job configuration, you just call
DistCacheResources.addToDistCache(...)


Kris

On Tue, Apr 20, 2010 at 5:16 AM, Rishav <superishav@gmail.com> wrote:
> Hello
>
> I am trying to pass an object of type LexicalizedParser (which is from an
> imported jar file stanford-parser) from the main method to the Mapper Class.
> This is to load the trained model first and then apply on every file offset
> in the map function in the Mapper Class.
> Could you tell me the best way to pass an object of any type to the Mapper?
>
> Thanking you
> Rishav
>
>
> --
> Rishav Bhowmick
> Computer Science -2010
> Carnegie Mellon University
> Tel: +974-561-4371
> e-mail: rishavb@cmu.edu
>
>

Mime
View raw message