spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Deng Ching-Mallete <och...@apache.org>
Subject Re: spark to hbase
Date Tue, 27 Oct 2015 10:21:37 GMT
Hi,

It would be more efficient if you configure the table and flush the commits
by partition instead of per element in the RDD. The latter works fine
because you only have 4 elements, but it won't bid well for large data sets
IMO..

Thanks,
Deng

On Tue, Oct 27, 2015 at 5:22 PM, jinhong lu <lujinhong2@gmail.com> wrote:

>
> Hi,
>
> I write my result to hdfs, it did well:
>
> val model = lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new
TrainFeature())(seqOp, combOp).values
>  model.map(a => (a.toKey() + "\t" + a.totalCount + "\t" + a.positiveCount)).saveAsTextFile(modelDataPath);
>
> But when I want to write to hbase, the applicaton hung, no log, no
> response, just stay there, and nothing is written to hbase:
>
> val model = lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new
TrainFeature())(seqOp, combOp).values.foreach({ res =>
>   val configuration = HBaseConfiguration.create();
>   configuration.set("hbase.zookeeper.property.clientPort", "2181");
>   configuration.set("hbase.zookeeper.quorum", “192.168.1.66");
>   configuration.set("hbase.master", "192.168.1:60000");
>   val hadmin = new HBaseAdmin(configuration);
>   val table = new HTable(configuration, "ljh_test3");
>   var put = new Put(Bytes.toBytes(res.toKey()));
>   put.add(Bytes.toBytes("f"), Bytes.toBytes("c"), Bytes.toBytes(res.totalCount + res.positiveCount));
>   table.put(put);
>   table.flushCommits()
> })
>
> And then I try to write som simple data to hbase, it did well too:
>
> sc.parallelize(Array(1,2,3,4)).foreach({ res =>
> val configuration = HBaseConfiguration.create();
> configuration.set("hbase.zookeeper.property.clientPort", "2181");
> configuration.set("hbase.zookeeper.quorum", "192.168.1.66");
> configuration.set("hbase.master", "192.168.1:60000");
> val hadmin = new HBaseAdmin(configuration);
> val table = new HTable(configuration, "ljh_test3");
> var put = new Put(Bytes.toBytes(res));
> put.add(Bytes.toBytes("f"), Bytes.toBytes("c"), Bytes.toBytes(res));
> table.put(put);
> table.flushCommits()
> })
>
> what is the problem with the 2rd code? thanks a lot.
>
>

Mime
View raw message