spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hao REN <julien19890...@gmail.com>
Subject Re: write data into HBase via spark
Date Thu, 14 Nov 2013 17:38:05 GMT
Hi, Philip.

Basically, we need* PairRDDFunctions.saveAsHadoopDataset* to do the job, as
HBase is not a fs, saveAsHadoopFile doesn't work.

*def saveAsHadoopDataset(conf: JobConf): Unit*

this function takes a JobConf parameter which should be configured.
Essentially, you need to set output format and the name of the output table.

*// step 1: JobConf setup:*

// Note: mapred package is used, instead of the mapreduce package which
contains new hadoop APIs.

*import org.apache.hadoop.hbase.mapred.TableOutputFormat*
*import org.apache.hadoop.hbase.client._*
// ... some other settings

*val conf = HBaseConfiguration.create()*

// general hbase setting
*conf.set("hbase.rootdir", "hdfs://" + nameNodeURL + ":" + hdfsPort +
"/hbase")*
*conf.setBoolean("hbase.cluster.distributed", true)*
*conf.set("hbase.zookeeper.quorum", hostname)*
*conf.setInt("hbase.client.scanner.caching", 10000)*
// ... some other settings

*val jobConfig: JobConf = new JobConf(conf, this.getClass)*

// Note:  TableOutputFormat is used as deprecated code, because JobConf is
an old hadoop API
*jobConfig.setOutputFormat(classOf[TableOutputFormat])*
*jobConfig.set(TableOutputFormat.OUTPUT_TABLE, outputTable)*


*// step 2: give your mapping:*

// the last thing todo is mapping your local data schema to the hbase one
// Say, our hbase schema is as below:
// *row    cf:col_1    cf:col_2*

// And in spark, you have a RDD of triple, like (1, 2, 3), (4, 5, 6), ...
// So you should map *RDD[(int, int, int)]* to *RDD[(ImmutableBytesWritable,
Put)]*, where Put carries the mapping.

// You can define a function used by RDD.map, for example:

*def convert(triple: (Int, Int, Int)) = {*
*      val p = new Put(Bytes.toBytes(triple._1))*
*      p.add(Bytes.toBytes("cf"), Bytes.toBytes("col_1"),
Bytes.toBytes(triple._2))*
*      p.add(Bytes.toBytes("cf"), Bytes.toBytes("col_2"),
Bytes.toBytes(triple._3))*
*      (new ImmutableBytesWritable, p)*
*}*

// Suppose you have a *RDD[(Int, Int, Int)]* called *localData*, then
writing data to hbase can be done by :

*new
PairRDDFunctions(localData.map(convert)).saveAsHadoopDataset(jobConfig)*

VoilĂ . That's all you need. Hopefully, this simple example could help.

Hao.






2013/11/13 Philip Ogren <philip.ogren@oracle.com>

>  Hao,
>
> If you have worked out the code and turn it into an example that you can
> share, then please do!  This task is in my queue of things to do so any
> helpful details that you uncovered would be most appreciated.
>
> Thanks,
> Philip
>
>
>
> On 11/13/2013 5:30 AM, Hao REN wrote:
>
> Ok, I worked it out.
>
>  The following thread helps a lot.
>
>
> http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201310.mbox/%3C7B4868A9-B83E-4507-BB2A-2721FCE8E738%40gmail.com%3E
>
>  Hao
>
>
> 2013/11/12 Hao REN <julien19890118@gmail.com>
>
>> Could someone show me a simple example about how to write data into HBase
>> via spark ?
>>
>>  I have checked HbaseTest example, it's only for reading from HBase.
>>
>>  Thank you.
>>
>>  --
>>  REN Hao
>>
>>  Data Engineer @ ClaraVista
>>
>>  Paris, France
>>
>>  Tel:  +33 06 14 54 57 24 <%2B33%2006%2014%2054%2057%2024>
>>
>
>
>
>  --
>  REN Hao
>
>  Data Engineer @ ClaraVista
>
>  Paris, France
>
>  Tel:  +33 06 14 54 57 24
>
>
>


-- 
REN Hao

Data Engineer @ ClaraVista

Paris, France

Tel:  +33 06 14 54 57 24

Mime
View raw message