hbase-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Felix Cheung <felixcheun...@hotmail.com>
Subject Re: Zeppelin using Spark to access Hbase throws error
Date Sat, 29 Oct 2016 20:53:08 GMT
When you run the code in spark-shell - is that the same machine as where Zeppelin is running?

It looks like you are getting socket connection timeout when Spark, running from Zeppelin,
is trying to connect to HBASE.

From: Mich Talebzadeh <mich.talebzadeh@gmail.com<mailto:mich.talebzadeh@gmail.com>>
Sent: Saturday, October 29, 2016 1:30 PM
Subject: Zeppelin using Spark to access Hbase throws error
To: <users@zeppelin.apache.org<mailto:users@zeppelin.apache.org>>, <user@hbase.apache.org<mailto:user@hbase.apache.org>>

Spark 2.0.1, Zeppelin 0.6.1, hbase-1.2.3

The below code runs fine with Spark shell.

import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.client.HTable
import scala.util.Random
import scala.math._
import org.apache.spark.sql.functions._
import org.apache.spark.rdd.NewHadoopRDD
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import java.nio.ByteBuffer
val tableName = "MARKETDATAHBASE"
val conf = HBaseConfiguration.create()
// Add local HBase conf
conf.set(TableInputFormat.INPUT_TABLE, tableName)
//create rdd
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])
val rdd1 = hBaseRDD.map(tuple => tuple._2).map(result => (result.getRow, result.getColumn("PRICE_INFO".getBytes(),
"TICKER".getBytes()))).map(row => {
  row._2.asScala.reduceLeft {
    (a, b) => if (a.getTimestamp > b.getTimestamp) a else b
case class columns (KEY: String, TICKER: String)
val dfTICKER = rdd1.toDF.map(p => columns(p(0).toString,p(1).toString))

However, in Zeppelin it throws this error:

dfTICKER: org.apache.spark.sql.Dataset[columns] = [KEY: string, TICKER: string]
org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after attempts=36, exceptions:
Sat Oct 29 21:02:41 BST 2016, null, java.net.SocketTimeoutException: callTimeout=60000, callDuration=68599:
row 'MARKETDATAHBASE,,00000000000000' on table 'hbase:meta' at region=hbase:meta,,1.1588230740,
hostname=rhes564,16201,1477246132044, seqNum=0

Is this related to Hbase region server?


Dr Mich Talebzadeh

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destructionof
data or any other property which may arise from relying on this email's technical content
is explicitly disclaimed.The author will in no case be liable for any monetary damages arising
from suchloss, damage or destruction.

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message