hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From syuanji...@apache.org
Subject [20/26] hbase git commit: HBASE-14795 Enhance the spark-hbase scan operations (Zhan Zhang)
Date Tue, 15 Dec 2015 17:43:34 GMT
HBASE-14795 Enhance the spark-hbase scan operations (Zhan Zhang)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/676ce01c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/676ce01c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/676ce01c

Branch: refs/heads/hbase-12439
Commit: 676ce01c82c137348e88d0acaa694ad214dc2f12
Parents: f34d3e1
Author: tedyu <yuzhihong@gmail.com>
Authored: Sun Dec 13 18:26:54 2015 -0800
Committer: tedyu <yuzhihong@gmail.com>
Committed: Sun Dec 13 18:26:54 2015 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/spark/DefaultSource.scala      | 100 +++-------
 .../hadoop/hbase/spark/HBaseContext.scala       |   2 +-
 .../hadoop/hbase/spark/datasources/Bound.scala  |  91 +++++++++
 .../spark/datasources/HBaseResources.scala      | 147 ++++++++++++++
 .../spark/datasources/HBaseTableScanRDD.scala   | 199 +++++++++++++++++++
 .../datasources/SerializableConfiguration.scala |  46 +++++
 .../hbase/spark/datasources/package.scala       |  36 ++++
 7 files changed, 545 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/676ce01c/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
index 23480bb..664cf57 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
@@ -20,11 +20,11 @@ package org.apache.hadoop.hbase.spark
 import java.util
 import java.util.concurrent.ConcurrentLinkedQueue
 
-import org.apache.hadoop.hbase.client.{ConnectionFactory, Get, Result, Scan}
+import org.apache.hadoop.hbase.client._
+import org.apache.hadoop.hbase.spark.datasources.{HBaseTableScanRDD, HBaseRegion, SerializableConfiguration}
 import org.apache.hadoop.hbase.types._
-import org.apache.hadoop.hbase.util.{SimplePositionedMutableByteRange,
-PositionedByteRange, Bytes}
-import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
+import org.apache.hadoop.hbase.util.{Bytes, PositionedByteRange, SimplePositionedMutableByteRange}
+import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
 import org.apache.spark.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.types.DataType
@@ -159,7 +159,7 @@ class DefaultSource extends RelationProvider with Logging {
  *                                connection information
  * @param sqlContext              SparkSQL context
  */
-class HBaseRelation (val tableName:String,
+case class HBaseRelation (val tableName:String,
                      val schemaMappingDefinition:
                      java.util.HashMap[String, SchemaQualifierDefinition],
                      val batchingNum:Int,
@@ -179,6 +179,9 @@ class HBaseRelation (val tableName:String,
     new HBaseContext(sqlContext.sparkContext, config)
   }
 
+  val wrappedConf = new SerializableConfiguration(hbaseContext.config)
+  def hbaseConf = wrappedConf.value
+
   /**
    * Generates a Spark SQL schema object so Spark SQL knows what is being
    * provided by this BaseRelation
@@ -222,6 +225,7 @@ class HBaseRelation (val tableName:String,
    */
   override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]
= {
 
+
     val pushDownTuple = buildPushDownPredicatesResource(filters)
     val pushDownRowKeyFilter = pushDownTuple._1
     var pushDownDynamicLogicExpression = pushDownTuple._2
@@ -253,7 +257,6 @@ class HBaseRelation (val tableName:String,
     //retain the information for unit testing checks
     DefaultSourceStaticUtils.populateLatestExecutionRules(pushDownRowKeyFilter,
       pushDownDynamicLogicExpression)
-    var resultRDD: RDD[Row] = null
 
     val getList = new util.ArrayList[Get]()
     val rddList = new util.ArrayList[RDD[Row]]()
@@ -268,77 +271,24 @@ class HBaseRelation (val tableName:String,
       getList.add(get)
     })
 
-    val rangeIt = pushDownRowKeyFilter.ranges.iterator
-
-    while (rangeIt.hasNext) {
-      val r = rangeIt.next()
-
-      val scan = new Scan()
-      scan.setBatch(batchingNum)
-      scan.setCaching(cachingNum)
-      requiredQualifierDefinitionList.foreach( d =>
-        if (d.columnFamilyBytes.length > 0)
-          scan.addColumn(d.columnFamilyBytes, d.qualifierBytes))
-
-      if (usePushDownColumnFilter && pushDownDynamicLogicExpression != null) {
-        val pushDownFilterJava =
-          new SparkSQLPushDownFilter(pushDownDynamicLogicExpression,
-            valueArray, requiredQualifierDefinitionList)
-
-        scan.setFilter(pushDownFilterJava)
-      }
-
-      //Check if there is a lower bound
-      if (r.lowerBound != null && r.lowerBound.length > 0) {
-
-        if (r.isLowerBoundEqualTo) {
-          //HBase startRow is inclusive: Therefore it acts like  isLowerBoundEqualTo
-          // by default
-          scan.setStartRow(r.lowerBound)
-        } else {
-          //Since we don't equalTo we want the next value we need
-          // to add another byte to the start key.  That new byte will be
-          // the min byte value.
-          val newArray = new Array[Byte](r.lowerBound.length + 1)
-          System.arraycopy(r.lowerBound, 0, newArray, 0, r.lowerBound.length)
-
-          //new Min Byte
-          newArray(r.lowerBound.length) = Byte.MinValue
-          scan.setStartRow(newArray)
-        }
+    val pushDownFilterJava = if (usePushDownColumnFilter && pushDownDynamicLogicExpression
!= null) {
+        Some(new SparkSQLPushDownFilter(pushDownDynamicLogicExpression,
+          valueArray, requiredQualifierDefinitionList))
+    } else {
+      None
+    }
+    val hRdd = new HBaseTableScanRDD(this, pushDownFilterJava, requiredQualifierDefinitionList.seq)
+    pushDownRowKeyFilter.ranges.foreach(hRdd.addRange(_))
+    var resultRDD: RDD[Row] = {
+      val tmp = hRdd.map{ r =>
+        Row.fromSeq(requiredColumns.map(c =>
+          DefaultSourceStaticUtils.getValue(c, serializableDefinitionMap, r)))
       }
-
-      //Check if there is a upperBound
-      if (r.upperBound != null && r.upperBound.length > 0) {
-        if (r.isUpperBoundEqualTo) {
-          //HBase stopRow is exclusive: therefore it DOESN'T ast like isUpperBoundEqualTo
-          // by default.  So we need to add a new max byte to the stopRow key
-          val newArray = new Array[Byte](r.upperBound.length + 1)
-          System.arraycopy(r.upperBound, 0, newArray, 0, r.upperBound.length)
-
-          //New Max Bytes
-          newArray(r.upperBound.length) = Byte.MaxValue
-          scan.setStopRow(newArray)
-        } else {
-          //Here equalTo is false for Upper bound which is exclusive and
-          // HBase stopRow acts like that by default so no need to mutate the
-          // rowKey
-          scan.setStopRow(r.upperBound)
-        }
+      if (tmp.partitions.size > 0) {
+        tmp
+      } else {
+        null
       }
-
-      val rdd = hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan).map(r => {
-        Row.fromSeq(requiredColumns.map(c =>
-          DefaultSourceStaticUtils.getValue(c, serializableDefinitionMap, r._2)))
-      })
-      rddList.add(rdd)
-    }
-
-    //If there is more then one RDD then we have to union them together
-    for (i <- 0 until rddList.size()) {
-      if (resultRDD == null) resultRDD = rddList.get(i)
-      else resultRDD = resultRDD.union(rddList.get(i))
-
     }
 
     //If there are gets then we can get them from the driver and union that rdd in

http://git-wip-us.apache.org/repos/asf/hbase/blob/676ce01c/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
index f8c24f2..2d21e69 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
@@ -57,7 +57,7 @@ import scala.collection.mutable
   * to the working and managing the life cycle of HConnections.
  */
 class HBaseContext(@transient sc: SparkContext,
-                   @transient config: Configuration,
+                   @transient val config: Configuration,
                    val tmpHdfsConfgFile: String = null)
   extends Serializable with Logging {
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/676ce01c/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala
new file mode 100644
index 0000000..c869f31
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark.datasources
+
+import org.apache.hadoop.hbase.spark.SparkSQLPushDownFilter
+import org.apache.spark.Partition
+import org.apache.hadoop.hbase.spark.hbase._
+
+/**
+ * The Bound represent the boudary for the scan
+ *
+ * @param b The byte array of the bound
+ * @param inc inclusive or not.
+ */
+case class Bound(b: Array[Byte], inc: Boolean)
+// The non-overlapping ranges we need to scan, if lower is equal to upper, it is a get request
+case class Range(lower: Option[Bound], upper: Option[Bound])
+object Range {
+  def apply(region: HBaseRegion): Range = {
+    Range(region.start.map(Bound(_, true)), if (region.end.get.size == 0) {
+      None
+    } else {
+      region.end.map((Bound(_, false)))
+    })
+  }
+}
+
+object Ranges {
+  // We assume that
+  // 1. r.lower.inc is true, and r.upper.inc is false
+  // 2. for each range in rs, its upper.inc is false
+  def and(r: Range, rs: Seq[Range]): Seq[Range] = {
+    rs.flatMap{ s =>
+      val lower = s.lower.map { x =>
+        // the scan has lower bound
+        r.lower.map { y =>
+          // the region has lower bound
+          if (ord.compare(x.b, y.b) < 0) {
+            // scan lower bound is smaller than region server lower bound
+            Some(y)
+          } else {
+            // scan low bound is greater or equal to region server lower bound
+            Some(x)
+          }
+        }.getOrElse(Some(x))
+      }.getOrElse(r.lower)
+
+      val upper =  s.upper.map { x =>
+        // the scan has upper bound
+        r.upper.map { y =>
+          // the region has upper bound
+          if (ord.compare(x.b, y.b) >= 0) {
+            // scan upper bound is larger than server upper bound
+            // but region server scan stop is exclusive. It is OK here.
+            Some(y)
+          } else {
+            // scan upper bound is less or equal to region server upper bound
+            Some(x)
+          }
+        }.getOrElse(Some(x))
+      }.getOrElse(r.upper)
+
+      val c = lower.map { case x =>
+        upper.map { case y =>
+          ord.compare(x.b, y.b)
+        }.getOrElse(-1)
+      }.getOrElse(-1)
+      if (c < 0) {
+        Some(Range(lower, upper))
+      } else {
+        None
+      }
+    }.seq
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hbase/blob/676ce01c/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseResources.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseResources.scala
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseResources.scala
new file mode 100644
index 0000000..4417520
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseResources.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark.datasources
+
+import org.apache.hadoop.hbase.TableName
+import org.apache.hadoop.hbase.client._
+import org.apache.hadoop.hbase.spark.HBaseRelation
+
+// Resource and ReferencedResources are defined for extensibility,
+// e.g., consolidate scan and bulkGet in the future work.
+
+// User has to invoke release explicitly to release the resource,
+// and potentially parent resources
+trait Resource {
+  def release(): Unit
+}
+
+case class ScanResource(tbr: TableResource, rs: ResultScanner) extends Resource {
+  def release() {
+    rs.close()
+    tbr.release()
+  }
+}
+
+trait ReferencedResource {
+  var count: Int = 0
+  def init(): Unit
+  def destroy(): Unit
+  def acquire() = synchronized {
+    try {
+      count += 1
+      if (count == 1) {
+        init()
+      }
+    } catch {
+      case e: Throwable =>
+        release()
+        throw e
+    }
+  }
+
+  def release() = synchronized {
+    count -= 1
+    if (count == 0) {
+      destroy()
+    }
+  }
+
+  def releaseOnException[T](func: => T): T = {
+    acquire()
+    val ret = {
+      try {
+        func
+      } catch {
+        case e: Throwable =>
+          release()
+          throw e
+      }
+    }
+    ret
+  }
+}
+
+case class TableResource(relation: HBaseRelation) extends ReferencedResource {
+  var connection: Connection = _
+  var table: Table = _
+
+  override def init(): Unit = {
+    connection = ConnectionFactory.createConnection(relation.hbaseConf)
+    table = connection.getTable(TableName.valueOf(relation.tableName))
+  }
+
+  override def destroy(): Unit = {
+    if (table != null) {
+      table.close()
+      table = null
+    }
+    if (connection != null) {
+      connection.close()
+      connection = null
+    }
+  }
+
+  def getScanner(scan: Scan): ScanResource = releaseOnException {
+    ScanResource(this, table.getScanner(scan))
+  }
+}
+
+case class RegionResource(relation: HBaseRelation) extends ReferencedResource {
+  var connection: Connection = _
+  var rl: RegionLocator = _
+  val regions = releaseOnException {
+    val keys = rl.getStartEndKeys
+    keys.getFirst.zip(keys.getSecond)
+      .zipWithIndex
+      .map(x =>
+      HBaseRegion(x._2,
+        Some(x._1._1),
+        Some(x._1._2),
+        Some(rl.getRegionLocation(x._1._1).getHostname)))
+  }
+
+  override def init(): Unit = {
+    connection = ConnectionFactory.createConnection(relation.hbaseConf)
+    rl = connection.getRegionLocator(TableName.valueOf(relation.tableName))
+  }
+
+  override def destroy(): Unit = {
+    if (rl != null) {
+      rl.close()
+      rl = null
+    }
+    if (connection != null) {
+      connection.close()
+      connection = null
+    }
+  }
+}
+
+object HBaseResources{
+  implicit def ScanResToScan(sr: ScanResource): ResultScanner = {
+    sr.rs
+  }
+
+  implicit def TableResToTable(tr: TableResource): Table = {
+    tr.table
+  }
+
+  implicit def RegionResToRegions(rr: RegionResource): Seq[HBaseRegion] = {
+    rr.regions
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/676ce01c/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
new file mode 100644
index 0000000..958a986
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark.datasources
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.hadoop.hbase.TableName
+import org.apache.hadoop.hbase.client._
+import org.apache.hadoop.hbase.filter.Filter
+import org.apache.hadoop.hbase.spark.{ScanRange, SchemaQualifierDefinition, HBaseRelation,
SparkSQLPushDownFilter}
+import org.apache.hadoop.hbase.spark.hbase._
+import org.apache.hadoop.hbase.spark.datasources.HBaseResources._
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.spark.sql.catalyst.expressions.Row
+import org.apache.spark.{TaskContext, Logging, Partition}
+import org.apache.spark.rdd.RDD
+
+import scala.collection.mutable
+
+
+class HBaseTableScanRDD(relation: HBaseRelation,
+     @transient val filter: Option[SparkSQLPushDownFilter] = None,
+     val columns: Seq[SchemaQualifierDefinition] = Seq.empty
+     )extends RDD[Result](relation.sqlContext.sparkContext, Nil) with Logging  {
+  var ranges = Seq.empty[Range]
+  def addRange(r: ScanRange) = {
+    val lower = if (r.lowerBound != null && r.lowerBound.length > 0) {
+      Some(Bound(r.lowerBound, r.isLowerBoundEqualTo))
+    } else {
+      None
+    }
+    val upper = if (r.upperBound != null && r.upperBound.length > 0) {
+      if (!r.isUpperBoundEqualTo) {
+        Some(Bound(r.upperBound, false))
+      } else {
+
+        // HBase stopRow is exclusive: therefore it DOESN'T act like isUpperBoundEqualTo
+        // by default.  So we need to add a new max byte to the stopRow key
+        val newArray = new Array[Byte](r.upperBound.length + 1)
+        System.arraycopy(r.upperBound, 0, newArray, 0, r.upperBound.length)
+
+        //New Max Bytes
+        newArray(r.upperBound.length) = ByteMin
+        Some(Bound(newArray, false))
+      }
+    } else {
+      None
+    }
+    ranges :+= Range(lower, upper)
+  }
+
+  override def getPartitions: Array[Partition] = {
+    val regions = RegionResource(relation)
+    var idx = 0
+    logDebug(s"There are ${regions.size} regions")
+    val ps = regions.flatMap { x =>
+      val rs = Ranges.and(Range(x), ranges)
+      if (rs.size > 0) {
+        if(log.isDebugEnabled) {
+          rs.foreach(x => logDebug(x.toString))
+        }
+        idx += 1
+        Some(HBaseScanPartition(idx - 1, x, rs, SerializedFilter.toSerializedTypedFilter(filter)))
+      } else {
+        None
+      }
+    }.toArray
+    regions.release()
+    ps.asInstanceOf[Array[Partition]]
+  }
+
+
+  override def getPreferredLocations(split: Partition): Seq[String] = {
+    split.asInstanceOf[HBaseScanPartition].regions.server.map {
+      identity
+    }.toSeq
+  }
+
+  private def buildScan(range: Range,
+      filter: Option[SparkSQLPushDownFilter],
+      columns: Seq[SchemaQualifierDefinition]): Scan = {
+    val scan = (range.lower, range.upper) match {
+      case (Some(Bound(a, b)), Some(Bound(c, d))) => new Scan(a, c)
+      case (None, Some(Bound(c, d))) => new Scan(Array[Byte](), c)
+      case (Some(Bound(a, b)), None) => new Scan(a)
+      case (None, None) => new Scan()
+    }
+
+    columns.foreach { d =>
+      if (d.columnFamilyBytes.length > 0) {
+        scan.addColumn(d.columnFamilyBytes, d.qualifierBytes)
+      }
+    }
+    scan.setBatch(relation.batchingNum)
+    scan.setCaching(relation.cachingNum)
+    filter.foreach(scan.setFilter(_))
+    scan
+  }
+  private def toResultIterator(scanner: ScanResource): Iterator[Result] = {
+    val iterator = new Iterator[Result] {
+      var cur: Option[Result] = None
+      override def hasNext: Boolean = {
+        if (cur.isEmpty) {
+          val r = scanner.next()
+          if (r == null) {
+            rddResources.release(scanner)
+          } else {
+            cur = Some(r)
+          }
+        }
+        cur.isDefined
+      }
+      override def next(): Result = {
+        hasNext
+        val ret = cur.get
+        cur = None
+        ret
+      }
+    }
+    iterator
+  }
+  lazy val rddResources = RDDResources(new mutable.HashSet[Resource]())
+
+  private def close() {
+    rddResources.release()
+  }
+
+  override def compute(split: Partition, context: TaskContext): Iterator[Result] = {
+    val partition = split.asInstanceOf[HBaseScanPartition]
+
+    val scans = partition.scanRanges
+      .map(buildScan(_, SerializedFilter.fromSerializedFilter(partition.sf), columns))
+    val tableResource = TableResource(relation)
+    context.addTaskCompletionListener(context => close())
+    val sIts = scans.par
+      .map(tableResource.getScanner(_))
+      .map(toResultIterator(_))
+      .fold(Iterator.empty: Iterator[Result]){ case (x, y) =>
+      x ++ y
+    }
+    sIts
+  }
+}
+
+case class SerializedFilter(b: Option[Array[Byte]])
+
+object SerializedFilter {
+  def toSerializedTypedFilter(f: Option[SparkSQLPushDownFilter]): SerializedFilter = {
+    SerializedFilter(f.map(_.toByteArray))
+  }
+
+  def fromSerializedFilter(sf: SerializedFilter): Option[SparkSQLPushDownFilter] = {
+    sf.b.map(SparkSQLPushDownFilter.parseFrom(_))
+  }
+}
+
+private[hbase] case class HBaseRegion(
+    override val index: Int,
+    val start: Option[HBaseType] = None,
+    val end: Option[HBaseType] = None,
+    val server: Option[String] = None) extends Partition
+
+
+private[hbase] case class HBaseScanPartition(
+    override val index: Int,
+    val regions: HBaseRegion,
+    val scanRanges: Seq[Range],
+    val sf: SerializedFilter) extends Partition
+
+case class RDDResources(set: mutable.HashSet[Resource]) {
+  def addResource(s: Resource) {
+    set += s
+  }
+  def release() {
+    set.foreach(release(_))
+  }
+  def release(rs: Resource) {
+    try {
+      rs.release()
+    } finally {
+      set.remove(rs)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/676ce01c/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerializableConfiguration.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerializableConfiguration.scala
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerializableConfiguration.scala
new file mode 100644
index 0000000..2452a52
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerializableConfiguration.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark.datasources
+
+import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.util.Utils
+
+import scala.util.control.NonFatal
+
+class SerializableConfiguration(@transient var value: Configuration) extends Serializable
{
+  private def writeObject(out: ObjectOutputStream): Unit = tryOrIOException {
+    out.defaultWriteObject()
+    value.write(out)
+  }
+
+  private def readObject(in: ObjectInputStream): Unit = tryOrIOException {
+    value = new Configuration(false)
+    value.readFields(in)
+  }
+
+  def tryOrIOException(block: => Unit) {
+    try {
+      block
+    } catch {
+      case e: IOException => throw e
+      case NonFatal(t) => throw new IOException(t)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/676ce01c/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/package.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/package.scala
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/package.scala
new file mode 100644
index 0000000..4ff0413
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/package.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark
+
+import org.apache.hadoop.hbase.util.Bytes
+
+import scala.math.Ordering
+
+package object hbase {
+  type HBaseType = Array[Byte]
+  val ByteMax = -1.asInstanceOf[Byte]
+  val ByteMin = 0.asInstanceOf[Byte]
+  val ord: Ordering[HBaseType] = new Ordering[HBaseType] {
+    def compare(x: Array[Byte], y: Array[Byte]): Int = {
+      return Bytes.compareTo(x, y)
+    }
+  }
+  //Do not use BinaryType.ordering
+  implicit val order: Ordering[HBaseType] = ord
+
+}


Mime
View raw message