Return-Path: X-Original-To: apmail-spark-user-archive@minotaur.apache.org Delivered-To: apmail-spark-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 84AED18D94 for ; Tue, 27 Oct 2015 11:53:42 +0000 (UTC) Received: (qmail 95151 invoked by uid 500); 27 Oct 2015 11:53:37 -0000 Delivered-To: apmail-spark-user-archive@spark.apache.org Received: (qmail 95058 invoked by uid 500); 27 Oct 2015 11:53:37 -0000 Mailing-List: contact user-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@spark.apache.org Received: (qmail 95048 invoked by uid 99); 27 Oct 2015 11:53:37 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 27 Oct 2015 11:53:37 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 52BFC1809D4 for ; Tue, 27 Oct 2015 11:53:37 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.487 X-Spam-Level: *** X-Spam-Status: No, score=3.487 tagged_above=-999 required=6.31 tests=[DKIM_ADSP_CUSTOM_MED=0.001, NML_ADSP_CUSTOM_MED=1.2, SPF_SOFTFAIL=0.972, URIBL_BLOCKED=0.001, URI_HEX=1.313] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id zdpnEbdYsQ9j for ; Tue, 27 Oct 2015 11:53:28 +0000 (UTC) Received: from mwork.nabble.com (mwork.nabble.com [162.253.133.43]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with ESMTP id 7417C43CD2 for ; Tue, 27 Oct 2015 11:53:28 +0000 (UTC) Received: from mben.nabble.com (unknown [162.253.133.72]) by mwork.nabble.com (Postfix) with ESMTP id 0776A2C32AD4 for ; Tue, 27 Oct 2015 04:54:23 -0700 (PDT) Date: Tue, 27 Oct 2015 04:53:28 -0700 (MST) From: Amit Singh Hora To: user@spark.apache.org Message-ID: <1445946808074-25211.post@n3.nabble.com> Subject: SPARKONHBase checkpointing issue MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Transfer-Encoding: 7bit Hi all , I am using Cloudera's SparkObHbase to bulk insert in hbase ,Please find below code object test { def main(args: Array[String]): Unit = { val conf = ConfigFactory.load("connection.conf").getConfig("connection") val checkpointDirectory=conf.getString("spark.checkpointDir") val ssc = StreamingContext.getOrCreate(checkpointDirectory, ()=>{ functionToCreateContext(checkpointDirectory) }) ssc.start() ssc.awaitTermination() } def getHbaseContext(sc: SparkContext,conf: Config): HBaseContext={ println("always gets created") val hconf = HBaseConfiguration.create(); val timeout= conf.getString("hbase.zookeepertimeout") val master=conf.getString("hbase.hbase_master") val zk=conf.getString("hbase.hbase_zkquorum") val zkport=conf.getString("hbase.hbase_zk_port") hconf.set("zookeeper.session.timeout",timeout); hconf.set("hbase.client.retries.number", Integer.toString(1)); hconf.set("zookeeper.recovery.retry", Integer.toString(1)); hconf.set("hbase.master", master); hconf.set("hbase.zookeeper.quorum",zk); hconf.set("zookeeper.znode.parent", "/hbase-unsecure"); hconf.set("hbase.zookeeper.property.clientPort",zkport ); val hbaseContext = new HBaseContext(sc, hconf); return hbaseContext } def functionToCreateContext(checkpointDirectory: String): StreamingContext = { println("creating for frst time") val conf = ConfigFactory.load("connection.conf").getConfig("connection") val brokerlist = conf.getString("kafka.broker") val topic = conf.getString("kafka.topic") val Array(brokers, topics) = Array(brokerlist, topic) val sparkConf = new SparkConf().setAppName("HBaseBulkPutTimestampExample " ) sparkConf.set("spark.cleaner.ttl", "2"); sparkConf.setMaster("local[2]") val topicsSet = topic.split(",").toSet val batchduration = conf.getString("spark.batchduration").toInt val ssc: StreamingContext = new StreamingContext(sparkConf, Seconds(batchduration)) ssc.checkpoint(checkpointDirectory) // set checkpoint directory val kafkaParams = Map[String, String]("metadata.broker.list" -> brokerlist, "auto.offset.reset" -> "smallest") val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicsSet) val lines=messages.map(_._2) getHbaseContext(ssc.sparkContext,conf).streamBulkPut[String](lines, "ecs_test", (putRecord) => { if (putRecord.length() > 0) { var maprecord = new HashMap[String, String]; val mapper = new ObjectMapper(); //convert JSON string to Map maprecord = mapper.readValue(putRecord, new TypeReference[HashMap[String, String]]() {}); var ts: Long = maprecord.get("ts").toLong var tweetID:Long= maprecord.get("id").toLong val key=ts+"_"+tweetID; val put = new Put(Bytes.toBytes(key)) maprecord.foreach(kv => { put.add(Bytes.toBytes("test"),Bytes.toBytes(kv._1),Bytes.toBytes(kv._2)) }) put } else { null } }, false); ssc } } i am not able to retrieve from checkpoint after restart ,always get Unable to getConfig from broadcast after debugging more i can see that the method for creating the HbaseContext actually broadcasts the configuration ,context object passed as a solution i just want to recreate the hbase context in every condition weather the checkpoint exists or not -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SPARKONHBase-checkpointing-issue-tp25211.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscribe@spark.apache.org For additional commands, e-mail: user-help@spark.apache.org