spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Balachandar R.A." <balachandar...@gmail.com>
Subject Using Hadoop Custom Input format in Spark
Date Tue, 27 Oct 2015 16:53:31 GMT
Hello,


I have developed a hadoop based solution that process a binary file. This
uses classic hadoop MR technique. The binary file is about 10GB and divided
into 73 HDFS blocks, and the business logic written as map process operates
on each of these 73 blocks. We have developed a customInputFormat and
CustomRecordReader in Hadoop that returns key (intWritable) and value
(BytesWritable) to the map function. The value is nothing but the contents
of a HDFS block(bianry data). The business logic knows how to read this
data.

Now, I would like to port this code in spark. I am a starter in spark and
could run simple examples (wordcount, pi example) in spark. However, could
not straightforward example to process binaryFiles in spark. I see there
are two solutions for this use case. In the first, avoid using custom input
format and record reader. Find a method (approach) in spark the creates a
RDD for those HDFS blocks, use a map like method that feeds HDFS block
content to the business logic. If this is not possible, I would like to
re-use the custom input format and custom reader using some methods such as
HadoopAPI, HadoopRDD etc. My problem:- I do not know whether the first
approach is possible or not. If possible, can anyone please provide some
pointers that contains examples? I was trying second approach but highly
unsuccessful. Here is the code snippet I used

object Driver {
    def myFunc(key : IntWritable, content : BytesWritable) = {
       println("my business logic")
      // printing key and content value/size is 0
   }


def main(args: Array[String]) {
  // create a spark context
  val conf = new
SparkConf().setAppName("Dummy").setMaster("spark://<host>:7077")
  val sc = new SparkContext(conf)
  val rd = sc.newAPIHadoopFile("hdfs:///user/name/MyDataFile.dat",
classOf[RandomAccessInputFormat], classOf[IntWritable],
classOf[BytesWritable])
  val count = rd.map (x => func(x._1, x._2)).collect()
   }
}

Can someone tell where I am doing wrong here? I think I am not using API
the right way but failed to find some documentation/usage examples.


Thanks in advancea

- bala

Mime
View raw message