Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 42BE0200CD4 for ; Fri, 14 Jul 2017 19:20:28 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 4151416DFF4; Fri, 14 Jul 2017 17:20:28 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 90B1116DFF0 for ; Fri, 14 Jul 2017 19:20:26 +0200 (CEST) Received: (qmail 70429 invoked by uid 500); 14 Jul 2017 17:20:25 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 70419 invoked by uid 99); 14 Jul 2017 17:20:25 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 14 Jul 2017 17:20:25 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9D194E968B; Fri, 14 Jul 2017 17:20:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: busbey@apache.org To: commits@hbase.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: hbase git commit: HBASE-18175 Add hbase-spark integration test into hbase-spark-it Date: Fri, 14 Jul 2017 17:20:25 +0000 (UTC) archived-at: Fri, 14 Jul 2017 17:20:28 -0000 Repository: hbase Updated Branches: refs/heads/branch-2 9cc940426 -> 53f354f95 HBASE-18175 Add hbase-spark integration test into hbase-spark-it * adds module hbase-spark-it * adds test IntegrationTestSparkBulkLoad * adds resultant jar to bin assembly Signed-off-by: Mike Drob Signed-off-by: Sean Busbey Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/53f354f9 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/53f354f9 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/53f354f9 Branch: refs/heads/branch-2 Commit: 53f354f95489a346bf24e5613883dcf9e5a044bb Parents: 9cc9404 Author: Yi Liang Authored: Wed Jul 12 17:12:52 2017 -0700 Committer: Sean Busbey Committed: Fri Jul 14 11:30:46 2017 -0500 ---------------------------------------------------------------------- hbase-assembly/pom.xml | 6 + hbase-assembly/src/main/assembly/src.xml | 1 + hbase-spark-it/pom.xml | 361 ++++++++++ .../spark/IntegrationTestSparkBulkLoad.java | 661 +++++++++++++++++++ pom.xml | 1 + 5 files changed, 1030 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/53f354f9/hbase-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/hbase-assembly/pom.xml b/hbase-assembly/pom.xml index bb6458e..667bfb9 100644 --- a/hbase-assembly/pom.xml +++ b/hbase-assembly/pom.xml @@ -269,6 +269,12 @@ org.apache.httpcomponents httpcore + + org.apache.hbase + hbase-spark-it + ${project.version} + test-jar + http://git-wip-us.apache.org/repos/asf/hbase/blob/53f354f9/hbase-assembly/src/main/assembly/src.xml ---------------------------------------------------------------------- diff --git a/hbase-assembly/src/main/assembly/src.xml b/hbase-assembly/src/main/assembly/src.xml index befcce0..e5d3faf 100644 --- a/hbase-assembly/src/main/assembly/src.xml +++ b/hbase-assembly/src/main/assembly/src.xml @@ -56,6 +56,7 @@ org.apache.hbase:hbase-shaded org.apache.hbase:hbase-shell org.apache.hbase:hbase-spark + org.apache.hbase:hbase-spark-it org.apache.hbase:hbase-testing-util org.apache.hbase:hbase-thrift http://git-wip-us.apache.org/repos/asf/hbase/blob/53f354f9/hbase-spark-it/pom.xml ---------------------------------------------------------------------- diff --git a/hbase-spark-it/pom.xml b/hbase-spark-it/pom.xml new file mode 100644 index 0000000..d94d4b8 --- /dev/null +++ b/hbase-spark-it/pom.xml @@ -0,0 +1,361 @@ + + + + 4.0.0 + + hbase + org.apache.hbase + 2.0.0-alpha-2-SNAPSHOT + .. + + + hbase-spark-it + Apache HBase - Spark Integration Tests + Integration and System tests for HBase + + + + 1.6.0 + 2.10.4 + 2.10 + + **/Test*.java + **/IntegrationTest*.java + + 4g + + + + + + + org.apache.maven.plugins + maven-site-plugin + + true + + + + + org.apache.maven.plugins + maven-source-plugin + + + + maven-assembly-plugin + + true + + + + org.apache.maven.plugins + maven-failsafe-plugin + ${surefire.version} + + + org.apache.maven.surefire + surefire-junit4 + ${surefire.version} + + + + + ${integrationtest.include} + + + ${unittest.include} + **/*$* + + ${test.output.tofile} + false + false + + + + integration-test + integration-test + + integration-test + + + + verify + verify + + verify + + + + + + + + + + + org.apache.maven.plugins + maven-failsafe-plugin + + false + always + + 1800 + -enableassertions -Xmx${failsafe.Xmx} + -Djava.security.egd=file:/dev/./urandom -XX:+CMSClassUnloadingEnabled + -verbose:gc -XX:+PrintCommandLineFlags -XX:+PrintFlagsFinal + + + + org.apache.maven.plugins + maven-enforcer-plugin + + + + banned-jsr305 + + enforce + + + false + + + + + banned-hbase-spark + + enforce + + + true + + + + banned-scala + + enforce + + + true + + + + + + maven-dependency-plugin + + + create-mrapp-generated-classpath + generate-test-resources + + build-classpath + + + + ${project.build.directory}/test-classes/spark-generated-classpath + + + + + + + + + + + + org.apache.hbase + hbase-annotations + test-jar + test + + + org.apache.hbase + hbase-common + jar + + + org.apache.hbase + hbase-protocol + + + org.apache.hbase + hbase-client + + + org.apache.hbase + hbase-rsgroup + + + org.apache.hbase + hbase-rsgroup + test-jar + test + + + org.apache.hbase + hbase-server + + + org.apache.hbase + hbase-spark + ${project.version} + + + org.apache.hbase + hbase-it + test-jar + + + org.apache.hbase + hbase-hadoop-compat + + + org.apache.hbase + ${compat.module} + ${project.version} + + + org.apache.hbase + hbase-testing-util + + + com.google.guava + guava + + + io.dropwizard.metrics + metrics-core + + + commons-logging + commons-logging + + + commons-cli + commons-cli + + + org.apache.commons + commons-math + + + commons-lang + commons-lang + + + org.apache.htrace + htrace-core + + + + io.netty + netty + ${netty.hadoop.version} + test + + + org.apache.spark + spark-core_${scala.binary.version} + ${spark.version} + provided + + + + org.scala-lang + scala-library + + + + org.scala-lang + scalap + + + com.google.code.findbugs + jsr305 + + + + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark.version} + provided + + + org.apache.spark + spark-streaming_${scala.binary.version} + ${spark.version} + provided + + + org.apache.spark + spark-streaming_${scala.binary.version} + ${spark.version} + test-jar + tests + test + + + + + + + skipIntegrationTests + + + skipIntegrationTests + + + + true + + + + + + + + org.apache.maven.plugins + maven-surefire-report-plugin + 2.7.2 + + + spark-integration-tests + + report-only + + + failsafe-report + + ${project.build.directory}/failsafe-reports + + + + + + + + + http://git-wip-us.apache.org/repos/asf/hbase/blob/53f354f9/hbase-spark-it/src/test/java/org/apache/hadoop/hbase/spark/IntegrationTestSparkBulkLoad.java ---------------------------------------------------------------------- diff --git a/hbase-spark-it/src/test/java/org/apache/hadoop/hbase/spark/IntegrationTestSparkBulkLoad.java b/hbase-spark-it/src/test/java/org/apache/hadoop/hbase/spark/IntegrationTestSparkBulkLoad.java new file mode 100644 index 0000000..2d84914 --- /dev/null +++ b/hbase-spark-it/src/test/java/org/apache/hadoop/hbase/spark/IntegrationTestSparkBulkLoad.java @@ -0,0 +1,661 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.spark; + +import com.google.common.collect.Sets; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.IntegrationTestBase; +import org.apache.hadoop.hbase.IntegrationTestingUtility; +import org.apache.hadoop.hbase.TableName; + +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Consistency; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; + +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; + +import org.apache.hadoop.hbase.mapreduce.IntegrationTestBulkLoad; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.RegionSplitter; + +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.ToolRunner; +import org.apache.spark.SerializableWritable; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; + +import org.apache.spark.Partitioner; + +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFlatMapFunction; +import org.apache.spark.api.java.function.VoidFunction; +import org.junit.Test; +import scala.Tuple2; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; + +/** + * Test Bulk Load and Spark on a distributed cluster. + * It starts an Spark job that creates linked chains. + * This test mimic {@link IntegrationTestBulkLoad} in mapreduce. + * + * Usage on cluster: + * spark-submit --class org.apache.hadoop.hbase.spark.IntegrationTestSparkBulkLoad + * HBASE_HOME/lib/hbase-it-XXX-tests.jar -m slowDeterministic -Dhbase.spark.bulkload.chainlength=300 + */ +public class IntegrationTestSparkBulkLoad extends IntegrationTestBase { + + private static final Log LOG = LogFactory.getLog(IntegrationTestSparkBulkLoad.class); + + // The number of partitions for random generated data + private static String BULKLOAD_PARTITIONS_NUM = "hbase.spark.bulkload.partitionsnum"; + private static int DEFAULT_BULKLOAD_PARTITIONS_NUM = 3; + + private static String BULKLOAD_CHAIN_LENGTH = "hbase.spark.bulkload.chainlength"; + private static int DEFAULT_BULKLOAD_CHAIN_LENGTH = 200000; + + private static String BULKLOAD_IMPORT_ROUNDS = "hbase.spark.bulkload.importround"; + private static int DEFAULT_BULKLOAD_IMPORT_ROUNDS = 1; + + private static String CURRENT_ROUND_NUM = "hbase.spark.bulkload.current.roundnum"; + + private static String NUM_REPLICA_COUNT_KEY = "hbase.spark.bulkload.replica.countkey"; + private static int DEFAULT_NUM_REPLICA_COUNT = 1; + + private static String BULKLOAD_TABLE_NAME = "hbase.spark.bulkload.tableName"; + private static String DEFAULT_BULKLOAD_TABLE_NAME = "IntegrationTestSparkBulkLoad"; + + private static String BULKLOAD_OUTPUT_PATH = "hbase.spark.bulkload.output.path"; + + private static final String OPT_LOAD = "load"; + private static final String OPT_CHECK = "check"; + + private boolean load = false; + private boolean check = false; + + private static final byte[] CHAIN_FAM = Bytes.toBytes("L"); + private static final byte[] SORT_FAM = Bytes.toBytes("S"); + private static final byte[] DATA_FAM = Bytes.toBytes("D"); + + /** + * Running spark job to load data into hbase table + */ + public void runLoad() throws Exception { + setupTable(); + int numImportRounds = getConf().getInt(BULKLOAD_IMPORT_ROUNDS, DEFAULT_BULKLOAD_IMPORT_ROUNDS); + LOG.info("Running load with numIterations:" + numImportRounds); + for (int i = 0; i < numImportRounds; i++) { + runLinkedListSparkJob(i); + } + } + + /** + * Running spark job to create LinkedList for testing + * @param iteration iteration th of this job + * @throws Exception + */ + public void runLinkedListSparkJob(int iteration) throws Exception { + String jobName = IntegrationTestSparkBulkLoad.class.getSimpleName() + " _load " + + EnvironmentEdgeManager.currentTime(); + + LOG.info("Running iteration " + iteration + "in Spark Job"); + + Path output = null; + if (conf.get(BULKLOAD_OUTPUT_PATH) == null) { + output = util.getDataTestDirOnTestFS(getTablename() + "-" + iteration); + } else { + output = new Path(conf.get(BULKLOAD_OUTPUT_PATH)); + } + + SparkConf sparkConf = new SparkConf().setAppName(jobName).setMaster("local"); + Configuration hbaseConf = new Configuration(getConf()); + hbaseConf.setInt(CURRENT_ROUND_NUM, iteration); + int partitionNum = hbaseConf.getInt(BULKLOAD_PARTITIONS_NUM, DEFAULT_BULKLOAD_PARTITIONS_NUM); + + + JavaSparkContext jsc = new JavaSparkContext(sparkConf); + JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, hbaseConf); + + + LOG.info("Partition RDD into " + partitionNum + " parts"); + List temp = new ArrayList<>(); + JavaRDD> rdd = jsc.parallelize(temp, partitionNum). + mapPartitionsWithIndex(new LinkedListCreationMapper(new SerializableWritable<>(hbaseConf)), false); + + hbaseContext.bulkLoad(rdd, getTablename(), new ListToKeyValueFunc(), output.toUri().getPath(), + new HashMap<>(), false, HConstants.DEFAULT_MAX_FILE_SIZE); + + try (Connection conn = ConnectionFactory.createConnection(conf); + Admin admin = conn.getAdmin(); + Table table = conn.getTable(getTablename()); + RegionLocator regionLocator = conn.getRegionLocator(getTablename())) { + // Create a new loader. + LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); + + // Load the HFiles into table. + loader.doBulkLoad(output, admin, table, regionLocator); + } + + + // Delete the files. + util.getTestFileSystem().delete(output, true); + jsc.close(); + } + + // See mapreduce.IntegrationTestBulkLoad#LinkedListCreationMapper + // Used to generate test data + public static class LinkedListCreationMapper implements + Function2, Iterator>> { + + SerializableWritable swConfig = null; + private Random rand = new Random(); + + public LinkedListCreationMapper(SerializableWritable conf) { + this.swConfig = conf; + } + + @Override + public Iterator> call(Integer v1, Iterator v2) throws Exception { + Configuration config = (Configuration) swConfig.value(); + int partitionId = v1.intValue(); + LOG.info("Starting create List in Partition " + partitionId); + + int partitionNum = config.getInt(BULKLOAD_PARTITIONS_NUM, DEFAULT_BULKLOAD_PARTITIONS_NUM); + int chainLength = config.getInt(BULKLOAD_CHAIN_LENGTH, DEFAULT_BULKLOAD_CHAIN_LENGTH); + int iterationsNum = config.getInt(BULKLOAD_IMPORT_ROUNDS, DEFAULT_BULKLOAD_IMPORT_ROUNDS); + int iterationsCur = config.getInt(CURRENT_ROUND_NUM, 0); + List> res = new LinkedList<>(); + + + long tempId = partitionId + iterationsCur * partitionNum; + long totalPartitionNum = partitionNum * iterationsNum; + long chainId = Math.abs(rand.nextLong()); + chainId = chainId - (chainId % totalPartitionNum) + tempId; + + byte[] chainIdArray = Bytes.toBytes(chainId); + long currentRow = 0; + long nextRow = getNextRow(0, chainLength); + for(long i = 0; i < chainLength; i++) { + byte[] rk = Bytes.toBytes(currentRow); + // Insert record into a list + List tmp1 = Arrays.asList(rk, CHAIN_FAM, chainIdArray, Bytes.toBytes(nextRow)); + List tmp2 = Arrays.asList(rk, SORT_FAM, chainIdArray, Bytes.toBytes(i)); + List tmp3 = Arrays.asList(rk, DATA_FAM, chainIdArray, Bytes.toBytes( + RandomStringUtils.randomAlphabetic(50))); + res.add(tmp1); + res.add(tmp2); + res.add(tmp3); + + currentRow = nextRow; + nextRow = getNextRow(i+1, chainLength); + } + return res.iterator(); + } + + /** Returns a unique row id within this chain for this index */ + private long getNextRow(long index, long chainLength) { + long nextRow = Math.abs(new Random().nextLong()); + // use significant bits from the random number, but pad with index to ensure it is unique + // this also ensures that we do not reuse row = 0 + // row collisions from multiple mappers are fine, since we guarantee unique chainIds + nextRow = nextRow - (nextRow % chainLength) + index; + return nextRow; + } + } + + + + public static class ListToKeyValueFunc implements + Function, Pair> { + @Override + public Pair call(List v1) throws Exception { + if (v1 == null || v1.size() != 4) { + return null; + } + KeyFamilyQualifier kfq = new KeyFamilyQualifier(v1.get(0), v1.get(1), v1.get(2)); + + return new Pair<>(kfq, v1.get(3)); + } + } + + /** + * After adding data to the table start a mr job to + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + public void runCheck() throws Exception { + LOG.info("Running check"); + String jobName = IntegrationTestSparkBulkLoad.class.getSimpleName() + "_check" + EnvironmentEdgeManager.currentTime(); + + SparkConf sparkConf = new SparkConf().setAppName(jobName).setMaster("local"); + Configuration hbaseConf = new Configuration(getConf()); + JavaSparkContext jsc = new JavaSparkContext(sparkConf); + JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, hbaseConf); + + Scan scan = new Scan(); + scan.addFamily(CHAIN_FAM); + scan.addFamily(SORT_FAM); + scan.setMaxVersions(1); + scan.setCacheBlocks(false); + scan.setBatch(1000); + int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, DEFAULT_NUM_REPLICA_COUNT); + if (replicaCount != DEFAULT_NUM_REPLICA_COUNT) { + scan.setConsistency(Consistency.TIMELINE); + } + + // 1. Using TableInputFormat to get data from HBase table + // 2. Mimic LinkedListCheckingMapper in mapreduce.IntegrationTestBulkLoad + // 3. Sort LinkKey by its order ID + // 4. Group LinkKey if they have same chainId, and repartition RDD by NaturalKeyPartitioner + // 5. Check LinkList in each Partition using LinkedListCheckingFlatMapFunc + hbaseContext.hbaseRDD(getTablename(), scan).flatMapToPair(new LinkedListCheckingFlatMapFunc()) + .sortByKey() + .combineByKey(new createCombinerFunc(), new mergeValueFunc(), new mergeCombinersFunc(), + new NaturalKeyPartitioner(new SerializableWritable<>(hbaseConf))) + .foreach(new LinkedListCheckingForeachFunc(new SerializableWritable<>(hbaseConf))); + jsc.close(); + } + + private void runCheckWithRetry() throws Exception { + try { + runCheck(); + } catch (Throwable t) { + LOG.warn("Received " + StringUtils.stringifyException(t)); + LOG.warn("Running the check MR Job again to see whether an ephemeral problem or not"); + runCheck(); + throw t; // we should still fail the test even if second retry succeeds + } + // everything green + } + + /** + * PairFlatMapFunction used to transfer to Tuple + */ + public static class LinkedListCheckingFlatMapFunc implements + PairFlatMapFunction, SparkLinkKey, SparkLinkChain> { + + @Override + public Iterable> call(Tuple2 v) + throws Exception { + Result value = v._2(); + long longRk = Bytes.toLong(value.getRow()); + List> list = new LinkedList<>(); + + for (Map.Entry entry : value.getFamilyMap(CHAIN_FAM).entrySet()) { + long chainId = Bytes.toLong(entry.getKey()); + long next = Bytes.toLong(entry.getValue()); + Cell c = value.getColumnCells(SORT_FAM, entry.getKey()).get(0); + long order = Bytes.toLong(CellUtil.cloneValue(c)); + Tuple2 tuple2 = + new Tuple2<>(new SparkLinkKey(chainId, order), new SparkLinkChain(longRk, next)); + list.add(tuple2); + } + return list; + } + } + + public static class createCombinerFunc implements + Function> { + @Override + public List call(SparkLinkChain v1) throws Exception { + List list = new LinkedList<>(); + list.add(v1); + return list; + } + } + + public static class mergeValueFunc implements + Function2, SparkLinkChain, List> { + @Override + public List call(List v1, SparkLinkChain v2) throws Exception { + if (v1 == null) + v1 = new LinkedList<>(); + v1.add(v2); + return v1; + } + } + + public static class mergeCombinersFunc implements + Function2, List, List> { + @Override + public List call(List v1, List v2) throws Exception { + v1.addAll(v2); + return v1; + } + } + + /** + * Class to figure out what partition to send a link in the chain to. This is based upon + * the linkKey's ChainId. + */ + public static class NaturalKeyPartitioner extends Partitioner { + + private int numPartions = 0; + public NaturalKeyPartitioner(SerializableWritable swConf) { + Configuration hbaseConf = (Configuration) swConf.value(); + numPartions = hbaseConf.getInt(BULKLOAD_PARTITIONS_NUM, DEFAULT_BULKLOAD_PARTITIONS_NUM); + + } + + @Override + public int numPartitions() { + return numPartions; + } + + @Override + public int getPartition(Object key) { + if (!(key instanceof SparkLinkKey)) + return -1; + int hash = ((SparkLinkKey) key).getChainId().hashCode(); + return Math.abs(hash % numPartions); + + } + } + + /** + * Sort all LinkChain for one LinkKey, and test List + */ + public static class LinkedListCheckingForeachFunc + implements VoidFunction>> { + + private SerializableWritable swConf = null; + + public LinkedListCheckingForeachFunc(SerializableWritable conf) { + swConf = conf; + } + + @Override + public void call(Tuple2> v1) throws Exception { + long next = -1L; + long prev = -1L; + long count = 0L; + + SparkLinkKey key = v1._1(); + List values = v1._2(); + + for (SparkLinkChain lc : values) { + + if (next == -1) { + if (lc.getRk() != 0L) { + String msg = "Chains should all start at rk 0, but read rk " + lc.getRk() + + ". Chain:" + key.getChainId() + ", order:" + key.getOrder(); + throw new RuntimeException(msg); + } + next = lc.getNext(); + } else { + if (next != lc.getRk()) { + String msg = "Missing a link in the chain. Prev rk " + prev + " was, expecting " + + next + " but got " + lc.getRk() + ". Chain:" + key.getChainId() + + ", order:" + key.getOrder(); + throw new RuntimeException(msg); + } + prev = lc.getRk(); + next = lc.getNext(); + } + count++; + } + Configuration hbaseConf = (Configuration) swConf.value(); + int expectedChainLen = hbaseConf.getInt(BULKLOAD_CHAIN_LENGTH, DEFAULT_BULKLOAD_CHAIN_LENGTH); + if (count != expectedChainLen) { + String msg = "Chain wasn't the correct length. Expected " + expectedChainLen + " got " + + count + ". Chain:" + key.getChainId() + ", order:" + key.getOrder(); + throw new RuntimeException(msg); + } + } + } + + /** + * Writable class used as the key to group links in the linked list. + * + * Used as the key emited from a pass over the table. + */ + public static class SparkLinkKey implements java.io.Serializable, Comparable { + + private Long chainId; + private Long order; + + public Long getOrder() { + return order; + } + + public Long getChainId() { + return chainId; + } + + public SparkLinkKey(long chainId, long order) { + this.chainId = chainId; + this.order = order; + } + + @Override + public int hashCode() { + return this.getChainId().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof SparkLinkKey)) + return false; + SparkLinkKey otherKey = (SparkLinkKey) other; + return this.getChainId().equals(otherKey.getChainId()); + } + + @Override + public int compareTo(SparkLinkKey other) { + int res = getChainId().compareTo(other.getChainId()); + if (res == 0) + res= getOrder().compareTo(other.getOrder()); + return res; + } + } + + /** + * Writable used as the value emitted from a pass over the hbase table. + */ + public static class SparkLinkChain implements java.io.Serializable, Comparable{ + + public Long getNext() { + return next; + } + + public Long getRk() { + return rk; + } + + + public SparkLinkChain(Long rk, Long next) { + this.rk = rk; + this.next = next; + } + + private Long rk; + private Long next; + + @Override + public int compareTo(SparkLinkChain linkChain) { + int res = getRk().compareTo(linkChain.getRk()); + if (res == 0) { + res = getNext().compareTo(linkChain.getNext()); + } + return res; + } + + @Override + public int hashCode() { + return getRk().hashCode() ^ getNext().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof SparkLinkChain)) + return false; + SparkLinkChain otherKey = (SparkLinkChain) other; + return this.getRk().equals(otherKey.getRk()) && this.getNext().equals(otherKey.getNext()); + } + } + + + /** + * Allow the scan to go to replica, this would not affect the runCheck() + * Since data are BulkLoaded from HFile into table + * @throws IOException + * @throws InterruptedException + */ + private void installSlowingCoproc() throws IOException, InterruptedException { + int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, DEFAULT_NUM_REPLICA_COUNT); + if (replicaCount == DEFAULT_NUM_REPLICA_COUNT) return; + + TableName t = getTablename(); + Admin admin = util.getAdmin(); + HTableDescriptor desc = admin.getTableDescriptor(t); + desc.addCoprocessor(IntegrationTestBulkLoad.SlowMeCoproScanOperations.class.getName()); + HBaseTestingUtility.modifyTableSync(admin, desc); + } + + @Test + public void testBulkLoad() throws Exception { + runLoad(); + installSlowingCoproc(); + runCheckWithRetry(); + } + + + private byte[][] getSplits(int numRegions) { + RegionSplitter.UniformSplit split = new RegionSplitter.UniformSplit(); + split.setFirstRow(Bytes.toBytes(0L)); + split.setLastRow(Bytes.toBytes(Long.MAX_VALUE)); + return split.split(numRegions); + } + + private void setupTable() throws IOException, InterruptedException { + if (util.getAdmin().tableExists(getTablename())) { + util.deleteTable(getTablename()); + } + + util.createTable( + getTablename(), + new byte[][]{CHAIN_FAM, SORT_FAM, DATA_FAM}, + getSplits(16) + ); + + int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, DEFAULT_NUM_REPLICA_COUNT); + if (replicaCount == DEFAULT_NUM_REPLICA_COUNT) return; + + TableName t = getTablename(); + HBaseTestingUtility.setReplicas(util.getAdmin(), t, replicaCount); + } + + @Override + public void setUpCluster() throws Exception { + util = getTestingUtil(getConf()); + util.initializeCluster(1); + int replicaCount = getConf().getInt(NUM_REPLICA_COUNT_KEY, DEFAULT_NUM_REPLICA_COUNT); + if (LOG.isDebugEnabled() && replicaCount != DEFAULT_NUM_REPLICA_COUNT) { + LOG.debug("Region Replicas enabled: " + replicaCount); + } + + // Scale this up on a real cluster + if (util.isDistributedCluster()) { + util.getConfiguration().setIfUnset(BULKLOAD_PARTITIONS_NUM, String.valueOf(DEFAULT_BULKLOAD_PARTITIONS_NUM)); + util.getConfiguration().setIfUnset(BULKLOAD_IMPORT_ROUNDS, "1"); + } else { + util.startMiniMapReduceCluster(); + } + } + + @Override + protected void addOptions() { + super.addOptions(); + super.addOptNoArg(OPT_CHECK, "Run check only"); + super.addOptNoArg(OPT_LOAD, "Run load only"); + } + + @Override + protected void processOptions(CommandLine cmd) { + super.processOptions(cmd); + check = cmd.hasOption(OPT_CHECK); + load = cmd.hasOption(OPT_LOAD); + } + + @Override + public int runTestFromCommandLine() throws Exception { + if (load) { + runLoad(); + } else if (check) { + installSlowingCoproc(); + runCheckWithRetry(); + } else { + testBulkLoad(); + } + return 0; + } + + @Override + public TableName getTablename() { + return getTableName(getConf()); + } + + public static TableName getTableName(Configuration conf) { + return TableName.valueOf(conf.get(BULKLOAD_TABLE_NAME, DEFAULT_BULKLOAD_TABLE_NAME)); + } + + @Override + protected Set getColumnFamilies() { + return Sets.newHashSet(Bytes.toString(CHAIN_FAM) , Bytes.toString(DATA_FAM), + Bytes.toString(SORT_FAM)); + } + + public static void main(String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(); + IntegrationTestingUtility.setUseDistributedCluster(conf); + int status = ToolRunner.run(conf, new IntegrationTestSparkBulkLoad(), args); + System.exit(status); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/53f354f9/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 6cc5a62..f3e17f7 100644 --- a/pom.xml +++ b/pom.xml @@ -86,6 +86,7 @@ hbase-archetypes hbase-metrics-api hbase-metrics + hbase-spark-it