spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Amit Singh Hora <hora.a...@gmail.com>
Subject SPARKONHBase checkpointing issue
Date Tue, 27 Oct 2015 11:53:28 GMT
Hi all ,

I am using Cloudera's SparkObHbase to bulk insert in hbase ,Please find
below code
object test {
  
    def main(args: Array[String]): Unit = {
    
    
    
   val conf = ConfigFactory.load("connection.conf").getConfig("connection")
    val checkpointDirectory=conf.getString("spark.checkpointDir")
    val ssc = StreamingContext.getOrCreate(checkpointDirectory, ()=>{
      functionToCreateContext(checkpointDirectory)
    })
 
    
    ssc.start()
    ssc.awaitTermination()
    
         }
    
    def getHbaseContext(sc: SparkContext,conf: Config): HBaseContext={
      println("always gets created")
           val hconf = HBaseConfiguration.create();
    val timeout= conf.getString("hbase.zookeepertimeout")
    val master=conf.getString("hbase.hbase_master")
    val zk=conf.getString("hbase.hbase_zkquorum")
    val zkport=conf.getString("hbase.hbase_zk_port")
    
      hconf.set("zookeeper.session.timeout",timeout);
    hconf.set("hbase.client.retries.number", Integer.toString(1));
    hconf.set("zookeeper.recovery.retry", Integer.toString(1));
    hconf.set("hbase.master", master);
    hconf.set("hbase.zookeeper.quorum",zk);
    hconf.set("zookeeper.znode.parent", "/hbase-unsecure");
    hconf.set("hbase.zookeeper.property.clientPort",zkport );

   
    val hbaseContext = new HBaseContext(sc, hconf);
    return hbaseContext
    }
  def functionToCreateContext(checkpointDirectory: String): StreamingContext
= {
    println("creating for frst time")
    val conf = ConfigFactory.load("connection.conf").getConfig("connection")
    val brokerlist = conf.getString("kafka.broker")
    val topic = conf.getString("kafka.topic")
    
    val Array(brokers, topics) = Array(brokerlist, topic)


    val sparkConf = new SparkConf().setAppName("HBaseBulkPutTimestampExample
" )
    sparkConf.set("spark.cleaner.ttl", "2");
    sparkConf.setMaster("local[2]")
    

     val topicsSet = topic.split(",").toSet
        val batchduration = conf.getString("spark.batchduration").toInt
    val ssc: StreamingContext = new StreamingContext(sparkConf,
Seconds(batchduration))
      ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
     val kafkaParams = Map[String, String]("metadata.broker.list" ->
brokerlist, "auto.offset.reset" -> "smallest")
    val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)
    val lines=messages.map(_._2)
   


    getHbaseContext(ssc.sparkContext,conf).streamBulkPut[String](lines,
      "ecs_test",
      (putRecord) => {
        if (putRecord.length() > 0) {
          var maprecord = new HashMap[String, String];
                  val mapper = new ObjectMapper();

                  //convert JSON string to Map
                  maprecord = mapper.readValue(putRecord,
                    new TypeReference[HashMap[String, String]]() {});
                  
                  var ts: Long = maprecord.get("ts").toLong
                  
                   var tweetID:Long= maprecord.get("id").toLong
              val key=ts+"_"+tweetID;
                  
                  val put = new Put(Bytes.toBytes(key))
                  maprecord.foreach(kv => {
                 
         
put.add(Bytes.toBytes("test"),Bytes.toBytes(kv._1),Bytes.toBytes(kv._2))
          

              })
        
        
          put
        } else {
          null
        }
      },
      false);

    ssc
      
  }
}

i am not able to retrieve from checkpoint after restart ,always get 
Unable to getConfig from broadcast

after debugging more i can see that the method for creating the HbaseContext
actually broadcasts the configuration ,context object passed

as a solution i just want to recreate the hbase context in every condition
weather the checkpoint exists or not



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SPARKONHBase-checkpointing-issue-tp25211.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message