spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jinhong lu <lujinho...@gmail.com>
Subject Re: spark to hbase
Date Wed, 28 Oct 2015 03:04:40 GMT
I write a demo, but still no response, no error, no log.

My hbase is 0.98, hadoop 2.3, spark 1.4.

And I run in yarn-client mode. any idea? thanks.


package com.lujinhong.sparkdemo

import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes

object SparkConnectHbase2 extends Serializable {

  def main(args: Array[String]) {
    new SparkConnectHbase2().toHbase();
  }

}

class SparkConnectHbase2 extends Serializable {

  def toHbase() {
    val conf = new SparkConf().setAppName("ljh_ml3");
    val sc = new SparkContext(conf)

    val tmp = sc.parallelize(Array(601, 701, 801, 901)).foreachPartition({ a => 
      val configuration = HBaseConfiguration.create();
      configuration.set("hbase.zookeeper.property.clientPort", "2181");
      configuration.set("hbase.zookeeper.quorum", “192.168.1.66");
      configuration.set("hbase.master", “192.168.1.66:60000");
      val table = new HTable(configuration, "ljh_test4");
      var put = new Put(Bytes.toBytes(a+""));
      put.add(Bytes.toBytes("f"), Bytes.toBytes("c"), Bytes.toBytes(a + "value"));
      table.put(put);
      table.flushCommits();
    })

  }

}


> 在 2015年10月28日,10:23,Fengdong Yu <fengdongy@everstring.com> 写道:
> 
> Also, please remove the HBase related to the Scala Object, this will resolve the serialize
issue and avoid open connection repeatedly.
> 
> and remember close the table after the final flush.
> 
> 
> 
>> On Oct 28, 2015, at 10:13 AM, Ted Yu <yuzhihong@gmail.com <mailto:yuzhihong@gmail.com>>
wrote:
>> 
>> For #2, have you checked task log(s) to see if there was some clue ?
>> 
>> You may want to use foreachPartition to reduce the number of flushes.
>> 
>> In the future, please remove color coding - it is not easy to read.
>> 
>> Cheers
>> 
>> On Tue, Oct 27, 2015 at 6:53 PM, jinhong lu <lujinhong2@gmail.com <mailto:lujinhong2@gmail.com>>
wrote:
>> Hi, Ted
>> 
>> thanks for your help.
>> 
>> I check the jar, it is in classpath, and now the problem is :
>> 
>> 1、 Follow codes runs good, and it put the  result to hbse:
>> 
>>   val res = lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new
TrainFeature())(seqOp, combOp).values.first()
>>  val configuration = HBaseConfiguration.create();
>>   configuration.set("hbase.zookeeper.property.clientPort", "2181");
>>   configuration.set("hbase.zookeeper.quorum", "192.168.1.66");
>>   configuration.set("hbase.master", "192.168.1.66:60000 <http://192.168.1.66:60000/>");
>>   val table = new HTable(configuration, "ljh_test3");
>>   var put = new Put(Bytes.toBytes(res.toKey()));
>>   put.add(Bytes.toBytes("f"), Bytes.toBytes("c"), Bytes.toBytes(res.positiveCount));
>>   table.put(put);
>>   table.flushCommits()
>> 
>> 2、But if I change the first() function to foreach:
>> 
>>   lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new
TrainFeature())(seqOp, combOp).values.foreach({res=>
>>   val configuration = HBaseConfiguration.create();
>>   configuration.set("hbase.zookeeper.property.clientPort", "2181");
>>   configuration.set("hbase.zookeeper.quorum", "192.168.1.66");
>>   configuration.set("hbase.master", "192.168.1.66:60000 <http://192.168.1.66:60000/>");
>>   val table = new HTable(configuration, "ljh_test3");
>>   var put = new Put(Bytes.toBytes(res.toKey()));
>>   put.add(Bytes.toBytes("f"), Bytes.toBytes("c"), Bytes.toBytes(res.positiveCount));
>>   table.put(put);
>> 
>> })
>> 
>> the application hung, and the last log is :
>> 
>> 15/10/28 09:30:33 INFO DAGScheduler: Missing parents for ResultStage 2: List()
>> 15/10/28 09:30:33 INFO DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[6]
at values at TrainModel3.scala:98), which is now runnable
>> 15/10/28 09:30:33 INFO MemoryStore: ensureFreeSpace(7032) called with curMem=264045,
maxMem=278302556
>> 15/10/28 09:30:33 INFO MemoryStore: Block broadcast_3 stored as values in memory
(estimated size 6.9 KB, free 265.2 MB)
>> 15/10/28 09:30:33 INFO MemoryStore: ensureFreeSpace(3469) called with curMem=271077,
maxMem=278302556
>> 15/10/28 09:30:33 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory
(estimated size 3.4 KB, free 265.1 MB)
>> 15/10/28 09:30:33 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 10.120.69.53:43019
(size: 3.4 KB, free: 265.4 MB)
>> 15/10/28 09:30:33 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:874
>> 15/10/28 09:30:33 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage
2 (MapPartitionsRDD[6] at values at TrainModel3.scala:98)
>> 15/10/28 09:30:33 INFO YarnScheduler: Adding task set 2.0 with 1 tasks
>> 15/10/28 09:30:33 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2, gdc-dn147-formal.i.nease.net
<http://gdc-dn147-formal.i.nease.net/>, PROCESS_LOCAL, 1716 bytes)
>> 15/10/28 09:30:34 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on gdc-dn147-formal.i.nease.net:59814
(size: 3.4 KB, free: 1060.3 MB)
>> 15/10/28 09:30:34 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations
for shuffle 0 to gdc-dn147-formal.i.nease.net:52904
>> 15/10/28 09:30:34 INFO MapOutputTrackerMaster: Size of output statuses for shuffle
0 is 154 bytes
>> 
>> 3、besides, I take the configuration and HTable out of foreach:
>> 
>> val configuration = HBaseConfiguration.create();
>> configuration.set("hbase.zookeeper.property.clientPort", "2181");
>> configuration.set("hbase.zookeeper.quorum", "192.168.1.66");
>> configuration.set("hbase.master", "192.168.1.66:60000");
>> val table = new HTable(configuration, "ljh_test3");
>> 
>> lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new
TrainFeature())(seqOp, combOp).values.foreach({ res =>
>> 
>>   var put = new Put(Bytes.toBytes(res.toKey()));
>>   put.add(Bytes.toBytes("f"), Bytes.toBytes("c"), Bytes.toBytes(res.positiveCount));
>>   table.put(put);
>> 
>> })
>> table.flushCommits()
>> 
>> found serializable problem:
>> 
>> Exception in thread "main" org.apache.spark.SparkException: Task not serializable
>>         at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
>>         at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
>>         at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
>>         at org.apache.spark.SparkContext.clean(SparkContext.scala:1891)
>>         at org.apache.spark.rdd.RDD
>> $$anonfun$foreach$1.apply(RDD.scala:869)
>> 	        at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:868)
>>         at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
>>         at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)
>>         at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
>>         at org.apache.spark.rdd.RDD.foreach(RDD.scala:868)
>>         at com.chencai.spark.ml.TrainModel3$.train(TrainModel3.scala:100)
>>         at com.chencai.spark.ml.TrainModel3$.main(TrainModel3.scala:115)
>>         at com.chencai.spark.ml.TrainModel3.main(TrainModel3.scala)
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:606)
>>         at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
>>         at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
>>         at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
>>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
>>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>> Caused by: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration
>> Serialization stack:
>>         - object not serializable (class: org.apache.hadoop.conf.Configuration, value:
Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml,
yarn-site.xml, hdfs-default.xml, hdfs-site.xml, hbase-default.xml, hbase-site.xml)
>>         - field (class: com.chencai.spark.ml.TrainModel3$$anonfun$train$5, name:
configuration$1, type: class org.apache.hadoop.conf.Configuration)
>> 	        - object (class com.chencai.spark.ml.TrainModel3$$anonfun$train$5, <function1>)
>>         at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
>>         at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
>>         at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
>>         at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
>>         ... 21 more
>> 
>> 
>> 
>> 
>>> 在 2015年10月28日,09:26,Ted Yu <yuzhihong@gmail.com <mailto:yuzhihong@gmail.com>>
写道:
>>> 
>>> Jinghong:
>>> In one of earlier threads on storing data to hbase, it was found that htrace
jar was not on classpath, leading to write failure.
>>> 
>>> Can you check whether you are facing the same problem ?
>>> 
>>> Cheers
>>> 
>>> On Tue, Oct 27, 2015 at 5:11 AM, Ted Yu <yuzhihong@gmail.com <mailto:yuzhihong@gmail.com>>
wrote:
>>> Jinghong:
>>> Hadmin variable is not used. You can omit that line. 
>>> 
>>> Which hbase release are you using ?
>>> 
>>> As Deng said, don't flush per row. 
>>> 
>>> Cheers
>>> 
>>> On Oct 27, 2015, at 3:21 AM, Deng Ching-Mallete <oching@apache.org <mailto:oching@apache.org>>
wrote:
>>> 
>>>> Hi,
>>>> 
>>>> It would be more efficient if you configure the table and flush the commits
by partition instead of per element in the RDD. The latter works fine because you only have
4 elements, but it won't bid well for large data sets IMO..
>>>> 
>>>> Thanks,
>>>> Deng
>>>> 
>>>> On Tue, Oct 27, 2015 at 5:22 PM, jinhong lu <lujinhong2@gmail.com <mailto:lujinhong2@gmail.com>>
wrote:
>>>> 
>>>> 
>>>> Hi, 
>>>> 
>>>> I write my result to hdfs, it did well:
>>>> 
>>>> val model = lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new
TrainFeature())(seqOp, combOp).values
>>>>  model.map(a => (a.toKey() + "\t" + a.totalCount + "\t" + a.positiveCount)).saveAsTextFile(modelDataPath);
>>>> 
>>>> But when I want to write to hbase, the applicaton hung, no log, no response,
just stay there, and nothing is written to hbase:
>>>> 
>>>> val model = lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new
TrainFeature())(seqOp, combOp).values.foreach({ res =>
>>>>   val configuration = HBaseConfiguration.create();
>>>>   configuration.set("hbase.zookeeper.property.clientPort", "2181");
>>>>   configuration.set("hbase.zookeeper.quorum", “192.168.1.66");
>>>>   configuration.set("hbase.master", "192.168.1:60000");
>>>>   val hadmin = new HBaseAdmin(configuration);
>>>>   val table = new HTable(configuration, "ljh_test3");
>>>>   var put = new Put(Bytes.toBytes(res.toKey()));
>>>>   put.add(Bytes.toBytes("f"), Bytes.toBytes("c"), Bytes.toBytes(res.totalCount
+ res.positiveCount));
>>>>   table.put(put);
>>>>   table.flushCommits()
>>>> })
>>>> 
>>>> And then I try to write som simple data to hbase, it did well too:
>>>> 
>>>> sc.parallelize(Array(1,2,3,4)).foreach({ res =>
>>>> val configuration = HBaseConfiguration.create();
>>>> configuration.set("hbase.zookeeper.property.clientPort", "2181");
>>>> configuration.set("hbase.zookeeper.quorum", "192.168.1.66");
>>>> configuration.set("hbase.master", "192.168.1:60000");
>>>> val hadmin = new HBaseAdmin(configuration);
>>>> val table = new HTable(configuration, "ljh_test3");
>>>> var put = new Put(Bytes.toBytes(res));
>>>> put.add(Bytes.toBytes("f"), Bytes.toBytes("c"), Bytes.toBytes(res));
>>>> table.put(put);
>>>> table.flushCommits()
>>>> })
>>>> 
>>>> what is the problem with the 2rd code? thanks a lot.
>>>> 
>>>> 
>>> 
>> 
>> 
> 


Mime
View raw message