spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fengdong Yu <fengdo...@everstring.com>
Subject Re: spark to hbase
Date Wed, 28 Oct 2015 02:23:29 GMT
Also, please remove the HBase related to the Scala Object, this will resolve the serialize
issue and avoid open connection repeatedly.

and remember close the table after the final flush.



> On Oct 28, 2015, at 10:13 AM, Ted Yu <yuzhihong@gmail.com> wrote:
> 
> For #2, have you checked task log(s) to see if there was some clue ?
> 
> You may want to use foreachPartition to reduce the number of flushes.
> 
> In the future, please remove color coding - it is not easy to read.
> 
> Cheers
> 
> On Tue, Oct 27, 2015 at 6:53 PM, jinhong lu <lujinhong2@gmail.com <mailto:lujinhong2@gmail.com>>
wrote:
> Hi, Ted
> 
> thanks for your help.
> 
> I check the jar, it is in classpath, and now the problem is :
> 
> 1、 Follow codes runs good, and it put the  result to hbse:
> 
>   val res = lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new
TrainFeature())(seqOp, combOp).values.first()
>  val configuration = HBaseConfiguration.create();
>   configuration.set("hbase.zookeeper.property.clientPort", "2181");
>   configuration.set("hbase.zookeeper.quorum", "192.168.1.66");
>   configuration.set("hbase.master", "192.168.1.66:60000 <http://192.168.1.66:60000/>");
>   val table = new HTable(configuration, "ljh_test3");
>   var put = new Put(Bytes.toBytes(res.toKey()));
>   put.add(Bytes.toBytes("f"), Bytes.toBytes("c"), Bytes.toBytes(res.positiveCount));
>   table.put(put);
>   table.flushCommits()
> 
> 2、But if I change the first() function to foreach:
> 
>   lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new
TrainFeature())(seqOp, combOp).values.foreach({res=>
>   val configuration = HBaseConfiguration.create();
>   configuration.set("hbase.zookeeper.property.clientPort", "2181");
>   configuration.set("hbase.zookeeper.quorum", "192.168.1.66");
>   configuration.set("hbase.master", "192.168.1.66:60000 <http://192.168.1.66:60000/>");
>   val table = new HTable(configuration, "ljh_test3");
>   var put = new Put(Bytes.toBytes(res.toKey()));
>   put.add(Bytes.toBytes("f"), Bytes.toBytes("c"), Bytes.toBytes(res.positiveCount));
>   table.put(put);
> 
> })
> 
> the application hung, and the last log is :
> 
> 15/10/28 09:30:33 INFO DAGScheduler: Missing parents for ResultStage 2: List()
> 15/10/28 09:30:33 INFO DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[6] at
values at TrainModel3.scala:98), which is now runnable
> 15/10/28 09:30:33 INFO MemoryStore: ensureFreeSpace(7032) called with curMem=264045,
maxMem=278302556
> 15/10/28 09:30:33 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated
size 6.9 KB, free 265.2 MB)
> 15/10/28 09:30:33 INFO MemoryStore: ensureFreeSpace(3469) called with curMem=271077,
maxMem=278302556
> 15/10/28 09:30:33 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory
(estimated size 3.4 KB, free 265.1 MB)
> 15/10/28 09:30:33 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 10.120.69.53:43019
(size: 3.4 KB, free: 265.4 MB)
> 15/10/28 09:30:33 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:874
> 15/10/28 09:30:33 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (MapPartitionsRDD[6]
at values at TrainModel3.scala:98)
> 15/10/28 09:30:33 INFO YarnScheduler: Adding task set 2.0 with 1 tasks
> 15/10/28 09:30:33 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2, gdc-dn147-formal.i.nease.net
<http://gdc-dn147-formal.i.nease.net/>, PROCESS_LOCAL, 1716 bytes)
> 15/10/28 09:30:34 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on gdc-dn147-formal.i.nease.net:59814
(size: 3.4 KB, free: 1060.3 MB)
> 15/10/28 09:30:34 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations
for shuffle 0 to gdc-dn147-formal.i.nease.net:52904
> 15/10/28 09:30:34 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 0
is 154 bytes
> 
> 3、besides, I take the configuration and HTable out of foreach:
> 
> val configuration = HBaseConfiguration.create();
> configuration.set("hbase.zookeeper.property.clientPort", "2181");
> configuration.set("hbase.zookeeper.quorum", "192.168.1.66");
> configuration.set("hbase.master", "192.168.1.66:60000");
> val table = new HTable(configuration, "ljh_test3");
> 
> lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new
TrainFeature())(seqOp, combOp).values.foreach({ res =>
> 
>   var put = new Put(Bytes.toBytes(res.toKey()));
>   put.add(Bytes.toBytes("f"), Bytes.toBytes("c"), Bytes.toBytes(res.positiveCount));
>   table.put(put);
> 
> })
> table.flushCommits()
> 
> found serializable problem:
> 
> Exception in thread "main" org.apache.spark.SparkException: Task not serializable
>         at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
>         at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
>         at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
>         at org.apache.spark.SparkContext.clean(SparkContext.scala:1891)
>         at org.apache.spark.rdd.RDD
> $$anonfun$foreach$1.apply(RDD.scala:869)
> 	        at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:868)
>         at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
>         at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)
>         at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
>         at org.apache.spark.rdd.RDD.foreach(RDD.scala:868)
>         at com.chencai.spark.ml.TrainModel3$.train(TrainModel3.scala:100)
>         at com.chencai.spark.ml.TrainModel3$.main(TrainModel3.scala:115)
>         at com.chencai.spark.ml.TrainModel3.main(TrainModel3.scala)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
>         at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
>         at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration
> Serialization stack:
>         - object not serializable (class: org.apache.hadoop.conf.Configuration, value:
Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml,
yarn-site.xml, hdfs-default.xml, hdfs-site.xml, hbase-default.xml, hbase-site.xml)
>         - field (class: com.chencai.spark.ml.TrainModel3$$anonfun$train$5, name: configuration$1,
type: class org.apache.hadoop.conf.Configuration)
> 	        - object (class com.chencai.spark.ml.TrainModel3$$anonfun$train$5, <function1>)
>         at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
>         at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
>         at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
>         at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
>         ... 21 more
> 
> 
> 
> 
>> 在 2015年10月28日,09:26,Ted Yu <yuzhihong@gmail.com <mailto:yuzhihong@gmail.com>>
写道:
>> 
>> Jinghong:
>> In one of earlier threads on storing data to hbase, it was found that htrace jar
was not on classpath, leading to write failure.
>> 
>> Can you check whether you are facing the same problem ?
>> 
>> Cheers
>> 
>> On Tue, Oct 27, 2015 at 5:11 AM, Ted Yu <yuzhihong@gmail.com <mailto:yuzhihong@gmail.com>>
wrote:
>> Jinghong:
>> Hadmin variable is not used. You can omit that line. 
>> 
>> Which hbase release are you using ?
>> 
>> As Deng said, don't flush per row. 
>> 
>> Cheers
>> 
>> On Oct 27, 2015, at 3:21 AM, Deng Ching-Mallete <oching@apache.org <mailto:oching@apache.org>>
wrote:
>> 
>>> Hi,
>>> 
>>> It would be more efficient if you configure the table and flush the commits by
partition instead of per element in the RDD. The latter works fine because you only have 4
elements, but it won't bid well for large data sets IMO..
>>> 
>>> Thanks,
>>> Deng
>>> 
>>> On Tue, Oct 27, 2015 at 5:22 PM, jinhong lu <lujinhong2@gmail.com <mailto:lujinhong2@gmail.com>>
wrote:
>>> 
>>> 
>>> Hi, 
>>> 
>>> I write my result to hdfs, it did well:
>>> 
>>> val model = lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new
TrainFeature())(seqOp, combOp).values
>>>  model.map(a => (a.toKey() + "\t" + a.totalCount + "\t" + a.positiveCount)).saveAsTextFile(modelDataPath);
>>> 
>>> But when I want to write to hbase, the applicaton hung, no log, no response,
just stay there, and nothing is written to hbase:
>>> 
>>> val model = lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new
TrainFeature())(seqOp, combOp).values.foreach({ res =>
>>>   val configuration = HBaseConfiguration.create();
>>>   configuration.set("hbase.zookeeper.property.clientPort", "2181");
>>>   configuration.set("hbase.zookeeper.quorum", “192.168.1.66");
>>>   configuration.set("hbase.master", "192.168.1:60000");
>>>   val hadmin = new HBaseAdmin(configuration);
>>>   val table = new HTable(configuration, "ljh_test3");
>>>   var put = new Put(Bytes.toBytes(res.toKey()));
>>>   put.add(Bytes.toBytes("f"), Bytes.toBytes("c"), Bytes.toBytes(res.totalCount
+ res.positiveCount));
>>>   table.put(put);
>>>   table.flushCommits()
>>> })
>>> 
>>> And then I try to write som simple data to hbase, it did well too:
>>> 
>>> sc.parallelize(Array(1,2,3,4)).foreach({ res =>
>>> val configuration = HBaseConfiguration.create();
>>> configuration.set("hbase.zookeeper.property.clientPort", "2181");
>>> configuration.set("hbase.zookeeper.quorum", "192.168.1.66");
>>> configuration.set("hbase.master", "192.168.1:60000");
>>> val hadmin = new HBaseAdmin(configuration);
>>> val table = new HTable(configuration, "ljh_test3");
>>> var put = new Put(Bytes.toBytes(res));
>>> put.add(Bytes.toBytes("f"), Bytes.toBytes("c"), Bytes.toBytes(res));
>>> table.put(put);
>>> table.flushCommits()
>>> })
>>> 
>>> what is the problem with the 2rd code? thanks a lot.
>>> 
>>> 
>> 
> 
> 


Mime
View raw message