Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2E4EC200C62 for ; Wed, 12 Apr 2017 02:01:14 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 2D12A160B9E; Wed, 12 Apr 2017 00:01:14 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 74472160B9B for ; Wed, 12 Apr 2017 02:01:13 +0200 (CEST) Received: (qmail 61471 invoked by uid 500); 12 Apr 2017 00:01:12 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 61462 invoked by uid 99); 12 Apr 2017 00:01:12 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 12 Apr 2017 00:01:12 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 86D4DDFDCD; Wed, 12 Apr 2017 00:01:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tedyu@apache.org To: commits@hbase.apache.org Message-Id: <4d587dfeb43c4fad869b7ed74547d9e8@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hbase git commit: HBASE-17905: [hbase-spark] bulkload does not work when table not exist Date: Wed, 12 Apr 2017 00:01:12 +0000 (UTC) archived-at: Wed, 12 Apr 2017 00:01:14 -0000 Repository: hbase Updated Branches: refs/heads/master 0b5bd78d6 -> 22f602cab HBASE-17905: [hbase-spark] bulkload does not work when table not exist Signed-off-by: tedyu Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/22f602ca Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/22f602ca Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/22f602ca Branch: refs/heads/master Commit: 22f602cab5e9739a650fc962f4b08a0ccc51a972 Parents: 0b5bd78 Author: Yi Liang Authored: Tue Apr 11 15:30:13 2017 -0700 Committer: tedyu Committed: Tue Apr 11 17:01:07 2017 -0700 ---------------------------------------------------------------------- .../hadoop/hbase/spark/BulkLoadPartitioner.scala | 13 ++++++++----- .../apache/hadoop/hbase/spark/HBaseContext.scala | 18 +++++++++++++++++- 2 files changed, 25 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/22f602ca/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala index ab4fc41..022c933 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala @@ -33,8 +33,8 @@ import org.apache.spark.Partitioner @InterfaceAudience.Public class BulkLoadPartitioner(startKeys:Array[Array[Byte]]) extends Partitioner { - - override def numPartitions: Int = startKeys.length + // when table not exist, startKeys = Byte[0][] + override def numPartitions: Int = if (startKeys.length == 0) 1 else startKeys.length override def getPartition(key: Any): Int = { @@ -53,8 +53,11 @@ class BulkLoadPartitioner(startKeys:Array[Array[Byte]]) case _ => key.asInstanceOf[Array[Byte]] } - val partition = util.Arrays.binarySearch(startKeys, rowKey, comparator) - if (partition < 0) partition * -1 + -2 - else partition + var partition = util.Arrays.binarySearch(startKeys, rowKey, comparator) + if (partition < 0) + partition = partition * -1 + -2 + if (partition < 0) + partition = 0 + partition } } http://git-wip-us.apache.org/repos/asf/hbase/blob/22f602ca/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala index e2891db..8c4e0f4 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala @@ -48,7 +48,7 @@ import org.apache.spark.streaming.dstream.DStream import java.io._ import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod -import org.apache.hadoop.fs.{Path, FileSystem} +import org.apache.hadoop.fs.{Path, FileAlreadyExistsException, FileSystem} import scala.collection.mutable /** @@ -620,9 +620,17 @@ class HBaseContext(@transient sc: SparkContext, compactionExclude: Boolean = false, maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE): Unit = { + val stagingPath = new Path(stagingDir) + val fs = stagingPath.getFileSystem(config) + if (fs.exists(stagingPath)) { + throw new FileAlreadyExistsException("Path " + stagingDir + " already exist") + } val conn = HBaseConnectionCache.getConnection(config) val regionLocator = conn.getRegionLocator(tableName) val startKeys = regionLocator.getStartKeys + if (startKeys.length == 0) { + logInfo("Table " + tableName.toString + " was not found") + } val defaultCompressionStr = config.get("hfile.compression", Compression.Algorithm.NONE.getName) val hfileCompression = HFileWriterImpl @@ -743,9 +751,17 @@ class HBaseContext(@transient sc: SparkContext, compactionExclude: Boolean = false, maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE): Unit = { + val stagingPath = new Path(stagingDir) + val fs = stagingPath.getFileSystem(config) + if (fs.exists(stagingPath)) { + throw new FileAlreadyExistsException("Path " + stagingDir + " already exist") + } val conn = HBaseConnectionCache.getConnection(config) val regionLocator = conn.getRegionLocator(tableName) val startKeys = regionLocator.getStartKeys + if (startKeys.length == 0) { + logInfo("Table " + tableName.toString + " was not found") + } val defaultCompressionStr = config.get("hfile.compression", Compression.Algorithm.NONE.getName) val defaultCompression = HFileWriterImpl