Return-Path: X-Original-To: apmail-phoenix-commits-archive@minotaur.apache.org Delivered-To: apmail-phoenix-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9ADB318A5A for ; Mon, 7 Dec 2015 23:02:35 +0000 (UTC) Received: (qmail 68879 invoked by uid 500); 7 Dec 2015 23:02:14 -0000 Delivered-To: apmail-phoenix-commits-archive@phoenix.apache.org Received: (qmail 68790 invoked by uid 500); 7 Dec 2015 23:02:14 -0000 Mailing-List: contact commits-help@phoenix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@phoenix.apache.org Delivered-To: mailing list commits@phoenix.apache.org Received: (qmail 68717 invoked by uid 99); 7 Dec 2015 23:02:14 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 07 Dec 2015 23:02:14 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E9EB0E098F; Mon, 7 Dec 2015 23:02:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ndimiduk@apache.org To: commits@phoenix.apache.org Date: Mon, 07 Dec 2015 23:02:15 -0000 Message-Id: <88d71688f1304dbbb0bee336fe6df885@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/3] phoenix git commit: PHOENIX-2481 JSON bulkload tool PHOENIX-2481 JSON bulkload tool Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8ae4217c Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8ae4217c Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8ae4217c Branch: refs/heads/4.x-HBase-1.0 Commit: 8ae4217ceaf965d50c7d5679d69aa3a0e118c628 Parents: 1e10e8b Author: Nick Dimiduk Authored: Mon Nov 16 17:18:34 2015 -0800 Committer: Nick Dimiduk Committed: Mon Dec 7 13:11:14 2015 -0800 ---------------------------------------------------------------------- .../phoenix/mapreduce/CsvBulkLoadToolIT.java | 16 +- .../phoenix/mapreduce/AbstractBulkLoadTool.java | 402 +++++++++++++++ .../phoenix/mapreduce/CsvBulkImportUtil.java | 20 +- .../phoenix/mapreduce/CsvBulkLoadTool.java | 514 +------------------ .../phoenix/mapreduce/CsvToKeyValueMapper.java | 226 +------- .../phoenix/mapreduce/CsvToKeyValueReducer.java | 55 -- .../mapreduce/FormatToKeyValueMapper.java | 259 ++++++++++ .../mapreduce/FormatToKeyValueReducer.java | 54 ++ .../ImportPreUpsertKeyValueProcessor.java | 3 +- .../phoenix/mapreduce/JsonBulkLoadTool.java | 53 ++ .../phoenix/mapreduce/JsonToKeyValueMapper.java | 75 +++ .../mapreduce/MultiHfileOutputFormat.java | 38 +- .../mapreduce/bulkload/CsvTableRowkeyPair.java | 139 ----- .../mapreduce/bulkload/TableRowkeyPair.java | 134 +++++ .../mapreduce/bulkload/TargetTableRef.java | 70 +++ .../bulkload/TargetTableRefFunctions.java | 95 ++++ .../util/PhoenixConfigurationUtil.java | 15 +- .../apache/phoenix/util/CSVCommonsLoader.java | 160 +----- .../org/apache/phoenix/util/SchemaUtil.java | 145 +++++- .../org/apache/phoenix/util/UpsertExecutor.java | 156 ++++++ .../phoenix/util/csv/CsvUpsertExecutor.java | 131 +---- .../phoenix/util/json/JsonUpsertExecutor.java | 209 ++++++++ .../util/json/ObjectToArrayConverter.java | 69 +++ .../phoenix/mapreduce/BulkLoadToolTest.java | 78 +++ .../mapreduce/CsvBulkImportUtilTest.java | 18 +- .../phoenix/mapreduce/CsvBulkLoadToolTest.java | 69 --- .../mapreduce/CsvToKeyValueMapperTest.java | 84 +-- .../mapreduce/FormatToKeyValueMapperTest.java | 102 ++++ .../util/AbstractUpsertExecutorTest.java | 136 +++++ .../phoenix/util/csv/CsvUpsertExecutorTest.java | 144 ++---- .../util/json/JsonUpsertExecutorTest.java | 53 ++ 31 files changed, 2222 insertions(+), 1500 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ae4217c/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java index 0e74d7b..a5b7488 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java @@ -17,14 +17,6 @@ */ package org.apache.phoenix.mapreduce; -import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster; -import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - import java.io.PrintWriter; import java.sql.Connection; import java.sql.DriverManager; @@ -47,6 +39,14 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster; +import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + @Category(NeedsOwnMiniClusterTest.class) public class CsvBulkLoadToolIT { http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ae4217c/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java new file mode 100644 index 0000000..cf9ddef --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java @@ -0,0 +1,402 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.PosixParser; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.jdbc.PhoenixDriver; +import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair; +import org.apache.phoenix.mapreduce.bulkload.TargetTableRef; +import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.util.ColumnInfo; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.SchemaUtil; +import org.apache.phoenix.util.StringUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ExecutionException; + +/** + * Base tool for running MapReduce-based ingests of data. + */ +public abstract class AbstractBulkLoadTool extends Configured implements Tool { + + protected static final Logger LOG = LoggerFactory.getLogger(AbstractBulkLoadTool.class); + + static final Option ZK_QUORUM_OPT = new Option("z", "zookeeper", true, "Supply zookeeper connection details (optional)"); + static final Option INPUT_PATH_OPT = new Option("i", "input", true, "Input path (mandatory)"); + static final Option OUTPUT_PATH_OPT = new Option("o", "output", true, "Output path for temporary HFiles (optional)"); + static final Option SCHEMA_NAME_OPT = new Option("s", "schema", true, "Phoenix schema name (optional)"); + static final Option TABLE_NAME_OPT = new Option("t", "table", true, "Phoenix table name (mandatory)"); + static final Option INDEX_TABLE_NAME_OPT = new Option("it", "index-table", true, "Phoenix index table name when just loading this particualar index table"); + static final Option IMPORT_COLUMNS_OPT = new Option("c", "import-columns", true, "Comma-separated list of columns to be imported"); + static final Option IGNORE_ERRORS_OPT = new Option("g", "ignore-errors", false, "Ignore input errors"); + static final Option HELP_OPT = new Option("h", "help", false, "Show this help and quit"); + + /** + * Set configuration values based on parsed command line options. + * + * @param cmdLine supplied command line options + * @param importColumns descriptors of columns to be imported + * @param conf job configuration + */ + protected abstract void configureOptions(CommandLine cmdLine, List importColumns, + Configuration conf) throws SQLException; + protected abstract void setupJob(Job job); + + protected Options getOptions() { + Options options = new Options(); + options.addOption(INPUT_PATH_OPT); + options.addOption(TABLE_NAME_OPT); + options.addOption(INDEX_TABLE_NAME_OPT); + options.addOption(ZK_QUORUM_OPT); + options.addOption(OUTPUT_PATH_OPT); + options.addOption(SCHEMA_NAME_OPT); + options.addOption(IMPORT_COLUMNS_OPT); + options.addOption(IGNORE_ERRORS_OPT); + options.addOption(HELP_OPT); + return options; + } + + /** + * Parses the commandline arguments, throws IllegalStateException if mandatory arguments are + * missing. + * + * @param args supplied command line arguments + * @return the parsed command line + */ + protected CommandLine parseOptions(String[] args) { + + Options options = getOptions(); + + CommandLineParser parser = new PosixParser(); + CommandLine cmdLine = null; + try { + cmdLine = parser.parse(options, args); + } catch (ParseException e) { + printHelpAndExit("Error parsing command line options: " + e.getMessage(), options); + } + + if (cmdLine.hasOption(HELP_OPT.getOpt())) { + printHelpAndExit(options, 0); + } + + if (!cmdLine.hasOption(TABLE_NAME_OPT.getOpt())) { + throw new IllegalStateException(TABLE_NAME_OPT.getLongOpt() + " is a mandatory " + + "parameter"); + } + + if (!cmdLine.getArgList().isEmpty()) { + throw new IllegalStateException("Got unexpected extra parameters: " + + cmdLine.getArgList()); + } + + if (!cmdLine.hasOption(INPUT_PATH_OPT.getOpt())) { + throw new IllegalStateException(INPUT_PATH_OPT.getLongOpt() + " is a mandatory " + + "parameter"); + } + + return cmdLine; + } + + private void printHelpAndExit(String errorMessage, Options options) { + System.err.println(errorMessage); + printHelpAndExit(options, 1); + } + + private void printHelpAndExit(Options options, int exitCode) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("help", options); + System.exit(exitCode); + } + + @Override + public int run(String[] args) throws Exception { + + Configuration conf = HBaseConfiguration.create(getConf()); + + CommandLine cmdLine = null; + try { + cmdLine = parseOptions(args); + } catch (IllegalStateException e) { + printHelpAndExit(e.getMessage(), getOptions()); + } + return loadData(conf, cmdLine); + } + + private int loadData(Configuration conf, CommandLine cmdLine) throws SQLException, + InterruptedException, ExecutionException, ClassNotFoundException { + String tableName = cmdLine.getOptionValue(TABLE_NAME_OPT.getOpt()); + String schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPT.getOpt()); + String indexTableName = cmdLine.getOptionValue(INDEX_TABLE_NAME_OPT.getOpt()); + String qualifiedTableName = getQualifiedTableName(schemaName, tableName); + String qualifiedIndexTableName = null; + if (indexTableName != null){ + qualifiedIndexTableName = getQualifiedTableName(schemaName, indexTableName); + } + + if (cmdLine.hasOption(ZK_QUORUM_OPT.getOpt())) { + // ZK_QUORUM_OPT is optional, but if it's there, use it for both the conn and the job. + String zkQuorum = cmdLine.getOptionValue(ZK_QUORUM_OPT.getOpt()); + PhoenixDriver.ConnectionInfo info = PhoenixDriver.ConnectionInfo.create(zkQuorum); + LOG.info("Configuring HBase connection to {}", info); + for (Map.Entry entry : info.asProps()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Setting {} = {}", entry.getKey(), entry.getValue()); + } + conf.set(entry.getKey(), entry.getValue()); + } + } + + final Connection conn = QueryUtil.getConnection(conf); + if (LOG.isDebugEnabled()) { + LOG.debug("Reading columns from {} :: {}", ((PhoenixConnection) conn).getURL(), + qualifiedTableName); + } + List importColumns = buildImportColumns(conn, cmdLine, qualifiedTableName); + Preconditions.checkNotNull(importColumns); + Preconditions.checkArgument(!importColumns.isEmpty(), "Column info list is empty"); + FormatToKeyValueMapper.configureColumnInfoList(conf, importColumns); + boolean ignoreInvalidRows = cmdLine.hasOption(IGNORE_ERRORS_OPT.getOpt()); + conf.setBoolean(FormatToKeyValueMapper.IGNORE_INVALID_ROW_CONFKEY, ignoreInvalidRows); + conf.set(FormatToKeyValueMapper.TABLE_NAME_CONFKEY, tableName); + + // give subclasses their hook + configureOptions(cmdLine, importColumns, conf); + try { + validateTable(conn, schemaName, tableName); + } finally { + conn.close(); + } + + final Path inputPath = new Path(cmdLine.getOptionValue(INPUT_PATH_OPT.getOpt())); + final Path outputPath; + if (cmdLine.hasOption(OUTPUT_PATH_OPT.getOpt())) { + outputPath = new Path(cmdLine.getOptionValue(OUTPUT_PATH_OPT.getOpt())); + } else { + outputPath = new Path("/tmp/" + UUID.randomUUID()); + } + + List tablesToBeLoaded = new ArrayList(); + tablesToBeLoaded.add(new TargetTableRef(qualifiedTableName)); + // using conn after it's been closed... o.O + tablesToBeLoaded.addAll(getIndexTables(conn, schemaName, qualifiedTableName)); + + // When loading a single index table, check index table name is correct + if (qualifiedIndexTableName != null){ + TargetTableRef targetIndexRef = null; + for (TargetTableRef tmpTable : tablesToBeLoaded){ + if (tmpTable.getLogicalName().compareToIgnoreCase(qualifiedIndexTableName) == 0) { + targetIndexRef = tmpTable; + break; + } + } + if (targetIndexRef == null){ + throw new IllegalStateException("Bulk Loader error: index table " + + qualifiedIndexTableName + " doesn't exist"); + } + tablesToBeLoaded.clear(); + tablesToBeLoaded.add(targetIndexRef); + } + + return submitJob(conf, tableName, inputPath, outputPath, tablesToBeLoaded); + } + + /** + * Submits the jobs to the cluster. + * Loads the HFiles onto the respective tables. + */ + public int submitJob(final Configuration conf, final String qualifiedTableName, final Path inputPath, + final Path outputPath , List tablesToBeLoaded) { + try { + Job job = new Job(conf, "Phoenix MapReduce import for " + qualifiedTableName); + FileInputFormat.addInputPath(job, inputPath); + FileOutputFormat.setOutputPath(job, outputPath); + + job.setInputFormatClass(TextInputFormat.class); + job.setMapOutputKeyClass(TableRowkeyPair.class); + job.setMapOutputValueClass(KeyValue.class); + job.setOutputKeyClass(TableRowkeyPair.class); + job.setOutputValueClass(KeyValue.class); + job.setReducerClass(FormatToKeyValueReducer.class); + + MultiHfileOutputFormat.configureIncrementalLoad(job, tablesToBeLoaded); + + final String tableNamesAsJson = TargetTableRefFunctions.NAMES_TO_JSON.apply(tablesToBeLoaded); + job.getConfiguration().set(FormatToKeyValueMapper.TABLE_NAMES_CONFKEY,tableNamesAsJson); + + // give subclasses their hook + setupJob(job); + + LOG.info("Running MapReduce import job from {} to {}", inputPath, outputPath); + boolean success = job.waitForCompletion(true); + + if (success) { + LOG.info("Loading HFiles from {}", outputPath); + completebulkload(conf,outputPath,tablesToBeLoaded); + } + + LOG.info("Removing output directory {}", outputPath); + if (!FileSystem.get(conf).delete(outputPath, true)) { + LOG.error("Removing output directory {} failed", outputPath); + } + return 0; + } catch(Exception e) { + LOG.error("Error {} occurred submitting BulkLoad ",e.getMessage()); + return -1; + } + + } + + private void completebulkload(Configuration conf,Path outputPath , List tablesToBeLoaded) throws Exception { + for(TargetTableRef table : tablesToBeLoaded) { + LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); + String tableName = table.getPhysicalName(); + Path tableOutputPath = new Path(outputPath,tableName); + HTable htable = new HTable(conf,tableName); + LOG.info("Loading HFiles for {} from {}", tableName , tableOutputPath); + loader.doBulkLoad(tableOutputPath, htable); + LOG.info("Incremental load complete for table=" + tableName); + } + } + + /** + * Build up the list of columns to be imported. The list is taken from the command line if + * present, otherwise it is taken from the table description. + * + * @param conn connection to Phoenix + * @param cmdLine supplied command line options + * @param qualifiedTableName table name (possibly with schema) of the table to be imported + * @return the list of columns to be imported + */ + List buildImportColumns(Connection conn, CommandLine cmdLine, + String qualifiedTableName) throws SQLException { + List userSuppliedColumnNames = null; + if (cmdLine.hasOption(IMPORT_COLUMNS_OPT.getOpt())) { + userSuppliedColumnNames = Lists.newArrayList( + Splitter.on(",").trimResults().split + (cmdLine.getOptionValue(IMPORT_COLUMNS_OPT.getOpt()))); + } + return SchemaUtil.generateColumnInfo( + conn, qualifiedTableName, userSuppliedColumnNames, true); + } + + /** + * Calculate the HBase HTable name for which the import is to be done. + * + * @param schemaName import schema name, can be null + * @param tableName import table name + * @return the byte representation of the import HTable + */ + @VisibleForTesting + static String getQualifiedTableName(String schemaName, String tableName) { + if (schemaName != null) { + return String.format("%s.%s", SchemaUtil.normalizeIdentifier(schemaName), + SchemaUtil.normalizeIdentifier(tableName)); + } else { + return SchemaUtil.normalizeIdentifier(tableName); + } + } + + /** + * Perform any required validation on the table being bulk loaded into: + * - ensure no column family names start with '_', as they'd be ignored leading to problems. + * @throws java.sql.SQLException + */ + private void validateTable(Connection conn, String schemaName, + String tableName) throws SQLException { + + ResultSet rs = conn.getMetaData().getColumns( + null, StringUtil.escapeLike(schemaName), + StringUtil.escapeLike(tableName), null); + while (rs.next()) { + String familyName = rs.getString(PhoenixDatabaseMetaData.COLUMN_FAMILY); + if (familyName != null && familyName.startsWith("_")) { + if (QueryConstants.DEFAULT_COLUMN_FAMILY.equals(familyName)) { + throw new IllegalStateException( + "Bulk Loader error: All column names that are not part of the " + + "primary key constraint must be prefixed with a column family " + + "name (i.e. f.my_column VARCHAR)"); + } else { + throw new IllegalStateException("Bulk Loader error: Column family name " + + "must not start with '_': " + familyName); + } + } + } + rs.close(); + } + + /** + * Get the index tables of current data table + * @throws java.sql.SQLException + */ + private List getIndexTables(Connection conn, String schemaName, String qualifiedTableName) + throws SQLException { + PTable table = PhoenixRuntime.getTable(conn, qualifiedTableName); + List indexTables = new ArrayList(); + for(PTable indexTable : table.getIndexes()){ + if (indexTable.getIndexType() == PTable.IndexType.LOCAL) { + throw new UnsupportedOperationException("Local indexes not supported by Bulk Loader"); + /*indexTables.add( + new TargetTableRef(getQualifiedTableName(schemaName, + indexTable.getTableName().getString()), + MetaDataUtil.getLocalIndexTableName(qualifiedTableName))); */ + } else { + indexTables.add(new TargetTableRef(getQualifiedTableName(schemaName, + indexTable.getTableName().getString()))); + } + } + return indexTables; + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ae4217c/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java index 6d77cd5..bdc67f9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java @@ -17,15 +17,11 @@ */ package org.apache.phoenix.mapreduce; -import java.util.List; - -import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.util.Base64; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; -import org.apache.phoenix.util.ColumnInfo; -import com.google.common.base.Preconditions; +import com.google.common.annotations.VisibleForTesting; /** * Collection of utility methods for setting up bulk import jobs. @@ -36,29 +32,19 @@ public class CsvBulkImportUtil { * Configure a job configuration for a bulk CSV import. * * @param conf job configuration to be set up - * @param tableName name of the table to be imported to, can include a schema name * @param fieldDelimiter field delimiter character for the CSV input * @param quoteChar quote character for the CSV input * @param escapeChar escape character for the CSV input * @param arrayDelimiter array delimiter character, can be null - * @param columnInfoList list of columns to be imported - * @param ignoreInvalidRows flag to ignore invalid input rows */ - public static void initCsvImportJob(Configuration conf, String tableName, char fieldDelimiter, char quoteChar, char escapeChar, - String arrayDelimiter, List columnInfoList, boolean ignoreInvalidRows) { - - Preconditions.checkNotNull(tableName); - Preconditions.checkNotNull(columnInfoList); - Preconditions.checkArgument(!columnInfoList.isEmpty(), "Column info list is empty"); - conf.set(CsvToKeyValueMapper.TABLE_NAME_CONFKEY, tableName); + public static void initCsvImportJob(Configuration conf, char fieldDelimiter, char quoteChar, + char escapeChar, String arrayDelimiter) { setChar(conf, CsvToKeyValueMapper.FIELD_DELIMITER_CONFKEY, fieldDelimiter); setChar(conf, CsvToKeyValueMapper.QUOTE_CHAR_CONFKEY, quoteChar); setChar(conf, CsvToKeyValueMapper.ESCAPE_CHAR_CONFKEY, escapeChar); if (arrayDelimiter != null) { conf.set(CsvToKeyValueMapper.ARRAY_DELIMITER_CONFKEY, arrayDelimiter); } - CsvToKeyValueMapper.configureColumnInfoList(conf, columnInfoList); - conf.setBoolean(CsvToKeyValueMapper.IGNORE_INVALID_ROW_CONFKEY, ignoreInvalidRows); } /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ae4217c/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java index 20f05ff..e0b083e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java @@ -17,364 +17,37 @@ */ package org.apache.phoenix.mapreduce; -import java.io.IOException; -import java.sql.Connection; -import java.sql.ResultSet; import java.sql.SQLException; -import java.util.ArrayList; import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ExecutionException; import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; -import org.apache.commons.cli.PosixParser; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import org.apache.phoenix.jdbc.PhoenixConnection; -import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; -import org.apache.phoenix.jdbc.PhoenixDriver; -import org.apache.phoenix.mapreduce.bulkload.CsvTableRowkeyPair; -import org.apache.phoenix.query.QueryConstants; -import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.schema.PTable.IndexType; -import org.apache.phoenix.util.CSVCommonsLoader; import org.apache.phoenix.util.ColumnInfo; -import org.apache.phoenix.util.PhoenixRuntime; -import org.apache.phoenix.util.QueryUtil; -import org.apache.phoenix.util.SchemaUtil; -import org.apache.phoenix.util.StringUtil; -import org.codehaus.jackson.annotate.JsonCreator; -import org.codehaus.jackson.annotate.JsonProperty; -import org.codehaus.jackson.map.ObjectMapper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Splitter; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; +public class CsvBulkLoadTool extends AbstractBulkLoadTool { -/** - * Base tool for running MapReduce-based ingests of data. - */ -@SuppressWarnings("deprecation") -public class CsvBulkLoadTool extends Configured implements Tool { - - private static final Logger LOG = LoggerFactory.getLogger(CsvBulkLoadTool.class); - - static final Option ZK_QUORUM_OPT = new Option("z", "zookeeper", true, "Supply zookeeper connection details (optional)"); - static final Option INPUT_PATH_OPT = new Option("i", "input", true, "Input CSV path (mandatory)"); - static final Option OUTPUT_PATH_OPT = new Option("o", "output", true, "Output path for temporary HFiles (optional)"); - static final Option SCHEMA_NAME_OPT = new Option("s", "schema", true, "Phoenix schema name (optional)"); - static final Option TABLE_NAME_OPT = new Option("t", "table", true, "Phoenix table name (mandatory)"); - static final Option INDEX_TABLE_NAME_OPT = new Option("it", "index-table", true, "Phoenix index table name when just loading this particualar index table"); static final Option DELIMITER_OPT = new Option("d", "delimiter", true, "Input delimiter, defaults to comma"); static final Option QUOTE_OPT = new Option("q", "quote", true, "Supply a custom phrase delimiter, defaults to double quote character"); static final Option ESCAPE_OPT = new Option("e", "escape", true, "Supply a custom escape character, default is a backslash"); static final Option ARRAY_DELIMITER_OPT = new Option("a", "array-delimiter", true, "Array element delimiter (optional)"); - static final Option IMPORT_COLUMNS_OPT = new Option("c", "import-columns", true, "Comma-separated list of columns to be imported"); - static final Option IGNORE_ERRORS_OPT = new Option("g", "ignore-errors", false, "Ignore input errors"); - static final Option HELP_OPT = new Option("h", "help", false, "Show this help and quit"); - - public static void main(String[] args) throws Exception { - int exitStatus = ToolRunner.run(new CsvBulkLoadTool(), args); - System.exit(exitStatus); - } - - /** - * Parses the commandline arguments, throws IllegalStateException if mandatory arguments are - * missing. - * - * @param args supplied command line arguments - * @return the parsed command line - */ - CommandLine parseOptions(String[] args) { - - Options options = getOptions(); - - CommandLineParser parser = new PosixParser(); - CommandLine cmdLine = null; - try { - cmdLine = parser.parse(options, args); - } catch (ParseException e) { - printHelpAndExit("Error parsing command line options: " + e.getMessage(), options); - } - - if (cmdLine.hasOption(HELP_OPT.getOpt())) { - printHelpAndExit(options, 0); - } - - if (!cmdLine.hasOption(TABLE_NAME_OPT.getOpt())) { - throw new IllegalStateException(TABLE_NAME_OPT.getLongOpt() + " is a mandatory " + - "parameter"); - } - - if (!cmdLine.getArgList().isEmpty()) { - throw new IllegalStateException("Got unexpected extra parameters: " - + cmdLine.getArgList()); - } - - if (!cmdLine.hasOption(INPUT_PATH_OPT.getOpt())) { - throw new IllegalStateException(INPUT_PATH_OPT.getLongOpt() + " is a mandatory " + - "parameter"); - } - return cmdLine; - } - - private Options getOptions() { - Options options = new Options(); - options.addOption(INPUT_PATH_OPT); - options.addOption(TABLE_NAME_OPT); - options.addOption(INDEX_TABLE_NAME_OPT); - options.addOption(ZK_QUORUM_OPT); - options.addOption(OUTPUT_PATH_OPT); - options.addOption(SCHEMA_NAME_OPT); + @Override + protected Options getOptions() { + Options options = super.getOptions(); options.addOption(DELIMITER_OPT); options.addOption(QUOTE_OPT); options.addOption(ESCAPE_OPT); options.addOption(ARRAY_DELIMITER_OPT); - options.addOption(IMPORT_COLUMNS_OPT); - options.addOption(IGNORE_ERRORS_OPT); - options.addOption(HELP_OPT); return options; } - - private void printHelpAndExit(String errorMessage, Options options) { - System.err.println(errorMessage); - printHelpAndExit(options, 1); - } - - private void printHelpAndExit(Options options, int exitCode) { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp("help", options); - System.exit(exitCode); - } - @Override - public int run(String[] args) throws Exception { - - Configuration conf = HBaseConfiguration.create(getConf()); - - CommandLine cmdLine = null; - try { - cmdLine = parseOptions(args); - } catch (IllegalStateException e) { - printHelpAndExit(e.getMessage(), getOptions()); - } - return loadData(conf, cmdLine); - } - - private int loadData(Configuration conf, CommandLine cmdLine) throws SQLException, - InterruptedException, ExecutionException, ClassNotFoundException { - String tableName = cmdLine.getOptionValue(TABLE_NAME_OPT.getOpt()); - String schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPT.getOpt()); - String indexTableName = cmdLine.getOptionValue(INDEX_TABLE_NAME_OPT.getOpt()); - String qualifiedTableName = getQualifiedTableName(schemaName, tableName); - String qualifiedIndexTableName = null; - if (indexTableName != null){ - qualifiedIndexTableName = getQualifiedTableName(schemaName, indexTableName); - } - - if (cmdLine.hasOption(ZK_QUORUM_OPT.getOpt())) { - // ZK_QUORUM_OPT is optional, but if it's there, use it for both the conn and the job. - String zkQuorum = cmdLine.getOptionValue(ZK_QUORUM_OPT.getOpt()); - PhoenixDriver.ConnectionInfo info = PhoenixDriver.ConnectionInfo.create(zkQuorum); - LOG.info("Configuring HBase connection to {}", info); - for (Map.Entry entry : info.asProps()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Setting {} = {}", entry.getKey(), entry.getValue()); - } - conf.set(entry.getKey(), entry.getValue()); - } - } - - final Connection conn = QueryUtil.getConnection(conf); - if (LOG.isDebugEnabled()) { - LOG.debug("Reading columns from {} :: {}", ((PhoenixConnection) conn).getURL(), - qualifiedTableName); - } - List importColumns = buildImportColumns(conn, cmdLine, qualifiedTableName); - configureOptions(cmdLine, importColumns, conf); - try { - validateTable(conn, schemaName, tableName); - } finally { - conn.close(); - } - - final Path inputPath = new Path(cmdLine.getOptionValue(INPUT_PATH_OPT.getOpt())); - final Path outputPath; - if (cmdLine.hasOption(OUTPUT_PATH_OPT.getOpt())) { - outputPath = new Path(cmdLine.getOptionValue(OUTPUT_PATH_OPT.getOpt())); - } else { - outputPath = new Path("/tmp/" + UUID.randomUUID()); - } - - List tablesToBeLoaded = new ArrayList(); - tablesToBeLoaded.add(new TargetTableRef(qualifiedTableName)); - // using conn after it's been closed... o.O - tablesToBeLoaded.addAll(getIndexTables(conn, schemaName, qualifiedTableName)); - - // When loading a single index table, check index table name is correct - if (qualifiedIndexTableName != null){ - TargetTableRef targetIndexRef = null; - for (TargetTableRef tmpTable : tablesToBeLoaded){ - if (tmpTable.getLogicalName().compareToIgnoreCase(qualifiedIndexTableName) == 0) { - targetIndexRef = tmpTable; - break; - } - } - if (targetIndexRef == null){ - throw new IllegalStateException("CSV Bulk Loader error: index table " + - qualifiedIndexTableName + " doesn't exist"); - } - tablesToBeLoaded.clear(); - tablesToBeLoaded.add(targetIndexRef); - } - - return submitJob(conf, tableName, inputPath, outputPath, tablesToBeLoaded); - } - - /** - * Submits the jobs to the cluster. - * Loads the HFiles onto the respective tables. - * @param configuration - * @param qualifiedTableName - * @param inputPath - * @param outputPath - * @param tablesToBeoaded - * @return status - */ - public int submitJob(final Configuration conf, final String qualifiedTableName, final Path inputPath, - final Path outputPath , List tablesToBeLoaded) { - try { - Job job = new Job(conf, "Phoenix MapReduce import for " + qualifiedTableName); - - // Allow overriding the job jar setting by using a -D system property at startup - if (job.getJar() == null) { - job.setJarByClass(CsvToKeyValueMapper.class); - } - job.setInputFormatClass(TextInputFormat.class); - FileInputFormat.addInputPath(job, inputPath); - FileOutputFormat.setOutputPath(job, outputPath); - job.setMapperClass(CsvToKeyValueMapper.class); - job.setMapOutputKeyClass(CsvTableRowkeyPair.class); - job.setMapOutputValueClass(KeyValue.class); - job.setOutputKeyClass(CsvTableRowkeyPair.class); - job.setOutputValueClass(KeyValue.class); - job.setReducerClass(CsvToKeyValueReducer.class); - - MultiHfileOutputFormat.configureIncrementalLoad(job, tablesToBeLoaded); - - final String tableNamesAsJson = TargetTableRefFunctions.NAMES_TO_JSON.apply(tablesToBeLoaded); - job.getConfiguration().set(CsvToKeyValueMapper.TABLE_NAMES_CONFKEY,tableNamesAsJson); - - LOG.info("Running MapReduce import job from {} to {}", inputPath, outputPath); - boolean success = job.waitForCompletion(true); - - if (success) { - LOG.info("Loading HFiles from {}", outputPath); - completebulkload(conf,outputPath,tablesToBeLoaded); - } - - LOG.info("Removing output directory {}", outputPath); - if (!FileSystem.get(conf).delete(outputPath, true)) { - LOG.error("Removing output directory {} failed", outputPath); - } - return 0; - } catch(Exception e) { - LOG.error("Error {} occurred submitting CSVBulkLoad ",e.getMessage()); - return -1; - } - - } - - /** - * bulkload HFiles . - * @param conf - * @param outputPath - * @param tablesToBeLoaded - * @throws Exception - */ - private void completebulkload(Configuration conf,Path outputPath , List tablesToBeLoaded) throws Exception { - for(TargetTableRef table : tablesToBeLoaded) { - LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); - String tableName = table.getPhysicalName(); - Path tableOutputPath = new Path(outputPath,tableName); - HTable htable = new HTable(conf,tableName); - LOG.info("Loading HFiles for {} from {}", tableName , tableOutputPath); - loader.doBulkLoad(tableOutputPath, htable); - LOG.info("Incremental load complete for table=" + tableName); - } - } - - /** - * Build up the list of columns to be imported. The list is taken from the command line if - * present, otherwise it is taken from the table description. - * - * @param conn connection to Phoenix - * @param cmdLine supplied command line options - * @param qualifiedTableName table name (possibly with schema) of the table to be imported - * @return the list of columns to be imported - */ - List buildImportColumns(Connection conn, CommandLine cmdLine, - String qualifiedTableName) throws SQLException { - List userSuppliedColumnNames = null; - if (cmdLine.hasOption(IMPORT_COLUMNS_OPT.getOpt())) { - userSuppliedColumnNames = Lists.newArrayList( - Splitter.on(",").trimResults().split - (cmdLine.getOptionValue(IMPORT_COLUMNS_OPT.getOpt()))); - } - return CSVCommonsLoader.generateColumnInfo( - conn, qualifiedTableName, userSuppliedColumnNames, true); - } - - /** - * Calculate the HBase HTable name for which the import is to be done. - * - * @param schemaName import schema name, can be null - * @param tableName import table name - * @return the byte representation of the import HTable - */ - @VisibleForTesting - static String getQualifiedTableName(String schemaName, String tableName) { - if (schemaName != null) { - return String.format("%s.%s", SchemaUtil.normalizeIdentifier(schemaName), - SchemaUtil.normalizeIdentifier(tableName)); - } else { - return SchemaUtil.normalizeIdentifier(tableName); - } - } - - /** - * Set configuration values based on parsed command line options. - * - * @param cmdLine supplied command line options - * @param importColumns descriptors of columns to be imported - * @param conf job configuration - */ - private static void configureOptions(CommandLine cmdLine, List importColumns, - Configuration conf) throws SQLException { + protected void configureOptions(CommandLine cmdLine, List importColumns, + Configuration conf) throws SQLException { // we don't parse ZK_QUORUM_OPT here because we need it in order to // create the connection we need to build importColumns. @@ -408,178 +81,23 @@ public class CsvBulkLoadTool extends Configured implements Tool { CsvBulkImportUtil.initCsvImportJob( conf, - getQualifiedTableName( - cmdLine.getOptionValue(SCHEMA_NAME_OPT.getOpt()), - cmdLine.getOptionValue(TABLE_NAME_OPT.getOpt())), delimiterChar, quoteChar, escapeChar, - cmdLine.getOptionValue(ARRAY_DELIMITER_OPT.getOpt()), - importColumns, - cmdLine.hasOption(IGNORE_ERRORS_OPT.getOpt())); + cmdLine.getOptionValue(ARRAY_DELIMITER_OPT.getOpt())); } - /** - * Perform any required validation on the table being bulk loaded into: - * - ensure no column family names start with '_', as they'd be ignored leading to problems. - * @throws java.sql.SQLException - */ - private void validateTable(Connection conn, String schemaName, - String tableName) throws SQLException { - - ResultSet rs = conn.getMetaData().getColumns( - null, StringUtil.escapeLike(schemaName), - StringUtil.escapeLike(tableName), null); - while (rs.next()) { - String familyName = rs.getString(PhoenixDatabaseMetaData.COLUMN_FAMILY); - if (familyName != null && familyName.startsWith("_")) { - if (QueryConstants.DEFAULT_COLUMN_FAMILY.equals(familyName)) { - throw new IllegalStateException( - "CSV Bulk Loader error: All column names that are not part of the " + - "primary key constraint must be prefixed with a column family " + - "name (i.e. f.my_column VARCHAR)"); - } else { - throw new IllegalStateException("CSV Bulk Loader error: Column family name " + - "must not start with '_': " + familyName); - } - } - } - rs.close(); - } - - /** - * Get the index tables of current data table - * @throws java.sql.SQLException - */ - private List getIndexTables(Connection conn, String schemaName, String qualifiedTableName) - throws SQLException { - PTable table = PhoenixRuntime.getTable(conn, qualifiedTableName); - List indexTables = new ArrayList(); - for(PTable indexTable : table.getIndexes()){ - if (indexTable.getIndexType() == IndexType.LOCAL) { - throw new UnsupportedOperationException("Local indexes not supported by CSV Bulk Loader"); - /*indexTables.add( - new TargetTableRef(getQualifiedTableName(schemaName, - indexTable.getTableName().getString()), - MetaDataUtil.getLocalIndexTableName(qualifiedTableName))); */ - } else { - indexTables.add(new TargetTableRef(getQualifiedTableName(schemaName, - indexTable.getTableName().getString()))); - } + @Override + protected void setupJob(Job job) { + // Allow overriding the job jar setting by using a -D system property at startup + if (job.getJar() == null) { + job.setJarByClass(CsvToKeyValueMapper.class); } - return indexTables; + job.setMapperClass(CsvToKeyValueMapper.class); } - /** - * Represents the logical and physical name of a single table to which data is to be loaded. - * - * This class exists to allow for the difference between HBase physical table names and - * Phoenix logical table names. - */ - static class TargetTableRef { - - @JsonProperty - private final String logicalName; - - @JsonProperty - private final String physicalName; - - @JsonProperty - private Map configuration = Maps.newHashMap(); - - private TargetTableRef(String name) { - this(name, name); - } - - @JsonCreator - private TargetTableRef(@JsonProperty("logicalName") String logicalName, @JsonProperty("physicalName") String physicalName) { - this.logicalName = logicalName; - this.physicalName = physicalName; - } - - public String getLogicalName() { - return logicalName; - } - - public String getPhysicalName() { - return physicalName; - } - - public Map getConfiguration() { - return configuration; - } - - public void setConfiguration(Map configuration) { - this.configuration = configuration; - } + public static void main(String[] args) throws Exception { + int exitStatus = ToolRunner.run(new CsvBulkLoadTool(), args); + System.exit(exitStatus); } - - /** - * Utility functions to get/put json. - * - */ - static class TargetTableRefFunctions { - - public static Function TO_JSON = new Function() { - - @Override - public String apply(TargetTableRef input) { - try { - ObjectMapper mapper = new ObjectMapper(); - return mapper.writeValueAsString(input); - } catch (IOException e) { - throw new RuntimeException(e); - } - - } - }; - - public static Function FROM_JSON = new Function() { - - @Override - public TargetTableRef apply(String json) { - try { - ObjectMapper mapper = new ObjectMapper(); - return mapper.readValue(json, TargetTableRef.class); - } catch (IOException e) { - throw new RuntimeException(e); - } - - } - }; - - public static Function,String> NAMES_TO_JSON = new Function,String>() { - - @Override - public String apply(List input) { - try { - List tableNames = Lists.newArrayListWithCapacity(input.size()); - for(TargetTableRef table : input) { - tableNames.add(table.getPhysicalName()); - } - ObjectMapper mapper = new ObjectMapper(); - return mapper.writeValueAsString(tableNames); - } catch (IOException e) { - throw new RuntimeException(e); - } - - } - }; - - public static Function> NAMES_FROM_JSON = new Function>() { - - @SuppressWarnings("unchecked") - @Override - public List apply(String json) { - try { - ObjectMapper mapper = new ObjectMapper(); - return mapper.readValue(json, ArrayList.class); - } catch (IOException e) { - throw new RuntimeException(e); - } - - } - }; - } - } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ae4217c/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java index c3b5a7d..5a5d378 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java @@ -19,46 +19,20 @@ package org.apache.phoenix.mapreduce; import java.io.IOException; import java.io.StringReader; -import java.sql.SQLException; -import java.util.Iterator; import java.util.List; -import java.util.Map.Entry; -import java.util.Properties; - -import javax.annotation.Nullable; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVParser; import org.apache.commons.csv.CSVRecord; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.phoenix.jdbc.PhoenixConnection; -import org.apache.phoenix.mapreduce.CsvBulkLoadTool.TargetTableRefFunctions; -import org.apache.phoenix.mapreduce.bulkload.CsvTableRowkeyPair; -import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; import org.apache.phoenix.util.CSVCommonsLoader; import org.apache.phoenix.util.ColumnInfo; -import org.apache.phoenix.util.PhoenixRuntime; -import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.UpsertExecutor; import org.apache.phoenix.util.csv.CsvUpsertExecutor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Joiner; import com.google.common.base.Preconditions; -import com.google.common.base.Splitter; -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; /** * MapReduce mapper that converts CSV input lines into KeyValues that can be written to HFiles. @@ -67,12 +41,7 @@ import com.google.common.collect.Lists; * extracting the created KeyValues and rolling back the statement execution before it is * committed to HBase. */ -public class CsvToKeyValueMapper extends Mapper { - - private static final Logger LOG = LoggerFactory.getLogger(CsvToKeyValueMapper.class); - - private static final String COUNTER_GROUP_NAME = "Phoenix MapReduce Import"; +public class CsvToKeyValueMapper extends FormatToKeyValueMapper { /** Configuration key for the field delimiter for input csv records */ public static final String FIELD_DELIMITER_CONFKEY = "phoenix.mapreduce.import.fielddelimiter"; @@ -86,117 +55,25 @@ public class CsvToKeyValueMapper extends Mapper tableNames; + @Override + protected LineParser getLineParser() { + return lineParser; + } @Override protected void setup(Context context) throws IOException, InterruptedException { - Configuration conf = context.getConfiguration(); - - // pass client configuration into driver - Properties clientInfos = new Properties(); - Iterator> iterator = conf.iterator(); - while(iterator.hasNext()) { - Entry entry = iterator.next(); - clientInfos.setProperty(entry.getKey(), entry.getValue()); - } - - try { - conn = (PhoenixConnection) QueryUtil.getConnection(clientInfos, conf); - } catch (SQLException | ClassNotFoundException e) { - throw new RuntimeException(e); - } - - final String tableNamesConf = conf.get(TABLE_NAMES_CONFKEY); - tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf); - - upsertListener = new MapperUpsertListener( - context, conf.getBoolean(IGNORE_INVALID_ROW_CONFKEY, true)); - csvUpsertExecutor = buildUpsertExecutor(conf); - csvLineParser = new CsvLineParser( + lineParser = new CsvLineParser( CsvBulkImportUtil.getCharacter(conf, FIELD_DELIMITER_CONFKEY), CsvBulkImportUtil.getCharacter(conf, QUOTE_CHAR_CONFKEY), CsvBulkImportUtil.getCharacter(conf, ESCAPE_CHAR_CONFKEY)); - - preUpdateProcessor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf); - } - - @SuppressWarnings("deprecation") - @Override - protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { - try { - CSVRecord csvRecord = null; - try { - csvRecord = csvLineParser.parse(value.toString()); - } catch (IOException e) { - context.getCounter(COUNTER_GROUP_NAME, "CSV Parser errors").increment(1L); - } - - if (csvRecord == null) { - context.getCounter(COUNTER_GROUP_NAME, "Empty records").increment(1L); - return; - } - csvUpsertExecutor.execute(ImmutableList.of(csvRecord)); - - Iterator>> uncommittedDataIterator - = PhoenixRuntime.getUncommittedDataIterator(conn, true); - while (uncommittedDataIterator.hasNext()) { - Pair> kvPair = uncommittedDataIterator.next(); - List keyValueList = kvPair.getSecond(); - keyValueList = preUpdateProcessor.preUpsert(kvPair.getFirst(), keyValueList); - byte[] first = kvPair.getFirst(); - for(String tableName : tableNames) { - if (Bytes.compareTo(Bytes.toBytes(tableName), first) != 0) { - // skip edits for other tables - continue; - } - for (KeyValue kv : keyValueList) { - ImmutableBytesWritable outputKey = new ImmutableBytesWritable(); - outputKey.set(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength()); - context.write(new CsvTableRowkeyPair(tableName, outputKey), kv); - } - } - } - conn.rollback(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - @Override - protected void cleanup(Context context) throws IOException, InterruptedException { - try { - conn.close(); - } catch (SQLException e) { - throw new RuntimeException(e); - } } @VisibleForTesting - CsvUpsertExecutor buildUpsertExecutor(Configuration conf) { + @Override + protected UpsertExecutor buildUpsertExecutor(Configuration conf) { String tableName = conf.get(TABLE_NAME_CONFKEY); String arraySeparator = conf.get(ARRAY_DELIMITER_CONFKEY, CSVCommonsLoader.DEFAULT_ARRAY_ELEMENT_SEPARATOR); @@ -204,78 +81,14 @@ public class CsvToKeyValueMapper extends Mapper columnInfoList = buildColumnInfoList(conf); - return CsvUpsertExecutor.create(conn, tableName, columnInfoList, upsertListener, arraySeparator); - } - - /** - * Write the list of to-import columns to a job configuration. - * - * @param conf configuration to be written to - * @param columnInfoList list of ColumnInfo objects to be configured for import - */ - @VisibleForTesting - static void configureColumnInfoList(Configuration conf, List columnInfoList) { - conf.set(COLUMN_INFO_CONFKEY, Joiner.on("|").useForNull("").join(columnInfoList)); - } - - /** - * Build the list of ColumnInfos for the import based on information in the configuration. - */ - @VisibleForTesting - static List buildColumnInfoList(Configuration conf) { - - return Lists.newArrayList( - Iterables.transform( - Splitter.on("|").split(conf.get(COLUMN_INFO_CONFKEY)), - new Function() { - @Nullable - @Override - public ColumnInfo apply(@Nullable String input) { - if (input.isEmpty()) { - // An empty string represents a null that was passed in to - // the configuration, which corresponds to an input column - // which is to be skipped - return null; - } - return ColumnInfo.fromString(input); - } - })); - } - - /** - * Listener that logs successful upserts and errors to job counters. - */ - @VisibleForTesting - static class MapperUpsertListener implements CsvUpsertExecutor.UpsertListener { - - private final Context context; - private final boolean ignoreRecordErrors; - - private MapperUpsertListener(Context context, boolean ignoreRecordErrors) { - this.context = context; - this.ignoreRecordErrors = ignoreRecordErrors; - } - - @Override - public void upsertDone(long upsertCount) { - context.getCounter(COUNTER_GROUP_NAME, "Upserts Done").increment(1L); - } - - @Override - public void errorOnRecord(CSVRecord csvRecord, Throwable throwable) { - LOG.error("Error on record " + csvRecord, throwable); - context.getCounter(COUNTER_GROUP_NAME, "Errors on records").increment(1L); - if (!ignoreRecordErrors) { - throw Throwables.propagate(throwable); - } - } + return new CsvUpsertExecutor(conn, tableName, columnInfoList, upsertListener, arraySeparator); } /** * Parses a single CSV input line, returning a {@code CSVRecord}. */ @VisibleForTesting - static class CsvLineParser { + static class CsvLineParser implements LineParser { private final CSVFormat csvFormat; CsvLineParser(char fieldDelimiter, char quote, char escape) { @@ -286,6 +99,7 @@ public class CsvToKeyValueMapper extends Mapper preUpsert(byte[] rowKey, List keyValues) { - return keyValues; - } - } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ae4217c/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueReducer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueReducer.java deleted file mode 100644 index 7e9c4fd..0000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueReducer.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.mapreduce; - -import java.io.IOException; -import java.util.TreeSet; - -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.phoenix.mapreduce.bulkload.CsvTableRowkeyPair; - -/** - * Reducer class for the CSVBulkLoad job. - * Performs similar functionality to {@link KeyValueSortReducer} - * - */ -public class CsvToKeyValueReducer extends Reducer { - - @Override - protected void reduce(CsvTableRowkeyPair key, Iterable values, - Reducer.Context context) - throws IOException, InterruptedException { - TreeSet map = new TreeSet(KeyValue.COMPARATOR); - for (KeyValue kv: values) { - try { - map.add(kv.clone()); - } catch (CloneNotSupportedException e) { - throw new java.io.IOException(e); - } - } - context.setStatus("Read " + map.getClass()); - int index = 0; - for (KeyValue kv: map) { - context.write(key, kv); - if (++index % 100 == 0) context.setStatus("Wrote " + index); - } - } - -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ae4217c/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java new file mode 100644 index 0000000..b2e99e5 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import javax.annotation.Nullable; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair; +import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.util.ColumnInfo; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.UpsertExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.base.Splitter; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + +/** + * Base class for converting some input source format into {@link KeyValue}s of a target + * schema. Assumes input format is text-based, with one row per line. Depends on an online cluster + * to retrieve {@link ColumnInfo} from the target table. + */ +public abstract class FormatToKeyValueMapper extends Mapper { + + protected static final Logger LOG = LoggerFactory.getLogger(FormatToKeyValueMapper.class); + + protected static final String COUNTER_GROUP_NAME = "Phoenix MapReduce Import"; + + /** Configuration key for the name of the output table */ + public static final String TABLE_NAME_CONFKEY = "phoenix.mapreduce.import.tablename"; + + /** Configuration key for the name of the output index table */ + public static final String INDEX_TABLE_NAME_CONFKEY = "phoenix.mapreduce.import.indextablename"; + + /** Configuration key for the columns to be imported */ + public static final String COLUMN_INFO_CONFKEY = "phoenix.mapreduce.import.columninfos"; + + /** Configuration key for the flag to ignore invalid rows */ + public static final String IGNORE_INVALID_ROW_CONFKEY = "phoenix.mapreduce.import.ignoreinvalidrow"; + + /** Configuration key for the table names */ + public static final String TABLE_NAMES_CONFKEY = "phoenix.mapreduce.import.tablenames"; + + /** Configuration key for the table configurations */ + public static final String TABLE_CONFIG_CONFKEY = "phoenix.mapreduce.import.table.config"; + + /** + * Parses a single input line, returning a {@code T}. + */ + public interface LineParser { + T parse(String input) throws IOException; + } + + protected PhoenixConnection conn; + protected UpsertExecutor upsertExecutor; + protected ImportPreUpsertKeyValueProcessor preUpdateProcessor; + protected List tableNames; + protected MapperUpsertListener upsertListener; + + protected abstract UpsertExecutor buildUpsertExecutor(Configuration conf); + protected abstract LineParser getLineParser(); + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + + Configuration conf = context.getConfiguration(); + + // pass client configuration into driver + Properties clientInfos = new Properties(); + for (Map.Entry entry : conf) { + clientInfos.setProperty(entry.getKey(), entry.getValue()); + } + + try { + conn = (PhoenixConnection) QueryUtil.getConnection(clientInfos, conf); + } catch (SQLException | ClassNotFoundException e) { + throw new RuntimeException(e); + } + + final String tableNamesConf = conf.get(TABLE_NAMES_CONFKEY); + tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf); + + upsertListener = new MapperUpsertListener( + context, conf.getBoolean(IGNORE_INVALID_ROW_CONFKEY, true)); + upsertExecutor = buildUpsertExecutor(conf); + preUpdateProcessor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf); + } + + @SuppressWarnings("deprecation") + @Override + protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + try { + RECORD record = null; + try { + record = getLineParser().parse(value.toString()); + } catch (IOException e) { + context.getCounter(COUNTER_GROUP_NAME, "Parser errors").increment(1L); + return; + } + + if (record == null) { + context.getCounter(COUNTER_GROUP_NAME, "Empty records").increment(1L); + return; + } + upsertExecutor.execute(ImmutableList.of(record)); + + Iterator>> uncommittedDataIterator + = PhoenixRuntime.getUncommittedDataIterator(conn, true); + while (uncommittedDataIterator.hasNext()) { + Pair> kvPair = uncommittedDataIterator.next(); + List keyValueList = kvPair.getSecond(); + keyValueList = preUpdateProcessor.preUpsert(kvPair.getFirst(), keyValueList); + byte[] first = kvPair.getFirst(); + for (String tableName : tableNames) { + if (Bytes.compareTo(Bytes.toBytes(tableName), first) != 0) { + // skip edits for other tables + continue; + } + for (KeyValue kv : keyValueList) { + ImmutableBytesWritable outputKey = new ImmutableBytesWritable(); + outputKey.set(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength()); + context.write(new TableRowkeyPair(tableName, outputKey), kv); + } + } + } + conn.rollback(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + protected void cleanup(Context context) throws IOException, InterruptedException { + try { + conn.close(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + /** + * Write the list of to-import columns to a job configuration. + * + * @param conf configuration to be written to + * @param columnInfoList list of ColumnInfo objects to be configured for import + */ + @VisibleForTesting + static void configureColumnInfoList(Configuration conf, List columnInfoList) { + conf.set(COLUMN_INFO_CONFKEY, Joiner.on("|").useForNull("").join(columnInfoList)); + } + + /** + * Build the list of ColumnInfos for the import based on information in the configuration. + */ + @VisibleForTesting + static List buildColumnInfoList(Configuration conf) { + + return Lists.newArrayList( + Iterables.transform( + Splitter.on("|").split(conf.get(COLUMN_INFO_CONFKEY)), + new Function() { + @Nullable + @Override + public ColumnInfo apply(@Nullable String input) { + if (input == null || input.isEmpty()) { + // An empty string represents a null that was passed in to + // the configuration, which corresponds to an input column + // which is to be skipped + return null; + } + return ColumnInfo.fromString(input); + } + })); + } + + /** + * Listener that logs successful upserts and errors to job counters. + */ + @VisibleForTesting + static class MapperUpsertListener implements UpsertExecutor.UpsertListener { + + private final Mapper.Context context; + private final boolean ignoreRecordErrors; + + private MapperUpsertListener( + Mapper.Context context, + boolean ignoreRecordErrors) { + this.context = context; + this.ignoreRecordErrors = ignoreRecordErrors; + } + + @Override + public void upsertDone(long upsertCount) { + context.getCounter(COUNTER_GROUP_NAME, "Upserts Done").increment(1L); + } + + @Override + public void errorOnRecord(T record, Throwable throwable) { + LOG.error("Error on record " + record, throwable); + context.getCounter(COUNTER_GROUP_NAME, "Errors on records").increment(1L); + if (!ignoreRecordErrors) { + throw Throwables.propagate(throwable); + } + } + } + + /** + * A default implementation of {@code ImportPreUpsertKeyValueProcessor} that is used if no + * specific class is configured. This implementation simply passes through the KeyValue + * list that is passed in. + */ + public static class DefaultImportPreUpsertKeyValueProcessor implements + ImportPreUpsertKeyValueProcessor { + + @Override + public List preUpsert(byte[] rowKey, List keyValues) { + return keyValues; + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ae4217c/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java new file mode 100644 index 0000000..5d00656 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import java.io.IOException; +import java.util.TreeSet; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair; + +/** + * Reducer class for the bulkload jobs. + * Performs similar functionality to {@link KeyValueSortReducer} + */ +public class FormatToKeyValueReducer + extends Reducer { + + @Override + protected void reduce(TableRowkeyPair key, Iterable values, + Reducer.Context context) + throws IOException, InterruptedException { + TreeSet map = new TreeSet(KeyValue.COMPARATOR); + for (KeyValue kv: values) { + try { + map.add(kv.clone()); + } catch (CloneNotSupportedException e) { + throw new java.io.IOException(e); + } + } + context.setStatus("Read " + map.getClass()); + int index = 0; + for (KeyValue kv: map) { + context.write(key, kv); + if (++index % 100 == 0) context.setStatus("Wrote " + index); + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ae4217c/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ImportPreUpsertKeyValueProcessor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ImportPreUpsertKeyValueProcessor.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ImportPreUpsertKeyValueProcessor.java index 62211f3..dff9ef2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ImportPreUpsertKeyValueProcessor.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ImportPreUpsertKeyValueProcessor.java @@ -24,7 +24,8 @@ import java.util.List; /** * A listener hook to process KeyValues that are being written to HFiles for bulk import. * Implementing this interface and configuring it via the {@link - * CsvToKeyValueMapper#UPSERT_HOOK_CLASS_CONFKEY} configuration key. + * org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil#UPSERT_HOOK_CLASS_CONFKEY} + * configuration key. *

* The intention of such a hook is to allow coproccessor-style operations to be peformed on * data that is being bulk-loaded via MapReduce. http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ae4217c/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/JsonBulkLoadTool.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/JsonBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/JsonBulkLoadTool.java new file mode 100644 index 0000000..1bea3f0 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/JsonBulkLoadTool.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import java.sql.SQLException; +import java.util.List; + +import org.apache.commons.cli.CommandLine; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.ToolRunner; +import org.apache.phoenix.util.ColumnInfo; + +/** + * A tool for running MapReduce-based ingests of JSON data. Nested JSON data structures are not + * handled, though lists are converted into typed ARRAYS. + */ +public class JsonBulkLoadTool extends AbstractBulkLoadTool { + + @Override + protected void configureOptions(CommandLine cmdLine, List importColumns, + Configuration conf) throws SQLException { + // noop + } + + @Override + protected void setupJob(Job job) { + // Allow overriding the job jar setting by using a -D system property at startup + if (job.getJar() == null) { + job.setJarByClass(JsonToKeyValueMapper.class); + } + job.setMapperClass(JsonToKeyValueMapper.class); + } + + public static void main(String[] args) throws Exception { + ToolRunner.run(new JsonBulkLoadTool(), args); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ae4217c/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/JsonToKeyValueMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/JsonToKeyValueMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/JsonToKeyValueMapper.java new file mode 100644 index 0000000..5173a0e --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/JsonToKeyValueMapper.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.phoenix.util.ColumnInfo; +import org.apache.phoenix.util.UpsertExecutor; +import org.apache.phoenix.util.json.JsonUpsertExecutor; +import org.codehaus.jackson.map.ObjectMapper; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +/** + * MapReduce mapper that converts JSON input lines into KeyValues that can be written to HFiles. + *

+ * KeyValues are produced by executing UPSERT statements on a Phoenix connection and then + * extracting the created KeyValues and rolling back the statement execution before it is + * committed to HBase. + */ +public class JsonToKeyValueMapper extends FormatToKeyValueMapper> { + + private LineParser> lineParser; + + @Override + protected LineParser> getLineParser() { + return lineParser; + } + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + lineParser = new JsonLineParser(); + } + + @VisibleForTesting + @Override + protected UpsertExecutor, ?> buildUpsertExecutor(Configuration conf) { + String tableName = conf.get(TABLE_NAME_CONFKEY); + Preconditions.checkNotNull(tableName, "table name is not configured"); + + List columnInfoList = buildColumnInfoList(conf); + + return new JsonUpsertExecutor(conn, tableName, columnInfoList, upsertListener); + } + + @VisibleForTesting + static class JsonLineParser implements LineParser> { + private final ObjectMapper mapper = new ObjectMapper(); + + @Override + public Map parse(String input) throws IOException { + return mapper.readValue(input, Map.class); + } + } +}