From commits-return-22303-archive-asf-public=cust-asf.ponee.io@accumulo.apache.org Wed Nov 7 19:15:45 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 958B218067A for ; Wed, 7 Nov 2018 19:15:39 +0100 (CET) Received: (qmail 34470 invoked by uid 500); 7 Nov 2018 18:15:38 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 34461 invoked by uid 99); 7 Nov 2018 18:15:38 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Nov 2018 18:15:38 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 67DA185771; Wed, 7 Nov 2018 18:15:37 +0000 (UTC) Date: Wed, 07 Nov 2018 18:15:37 +0000 To: "commits@accumulo.apache.org" Subject: [accumulo] branch master updated: New MapReduce API (#743) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <154161453727.32119.15839436249510972870@gitbox.apache.org> From: mmiller@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: accumulo X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 8434345bef31bc3d2f2dd9d1759ae8b17587f225 X-Git-Newrev: 9dadca0fdd6553ee0d8878139f971d3269eca56a X-Git-Rev: 9dadca0fdd6553ee0d8878139f971d3269eca56a X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. mmiller pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/master by this push: new 9dadca0 New MapReduce API (#743) 9dadca0 is described below commit 9dadca0fdd6553ee0d8878139f971d3269eca56a Author: Mike Miller AuthorDate: Wed Nov 7 13:15:32 2018 -0500 New MapReduce API (#743) * Created OutputInfo, FileOutputInfo, InputInfo fluent API for building options for single static method * Made top level classes extend hadoop classes and moved underlying code to hadoopImpl * Updated unit Tests in hadoop-mapreduce to use new API * Created NewAccumuloInputFormatIT to test new API * Added log4j.properties for testing * Removed AccumuloMultiTableInputFormat (see #749) * Removed setting of Log4j loglevel * Remove deprecated methods and broken javadoc --- hadoop-mapreduce/pom.xml | 19 +- .../hadoop/mapred/AccumuloFileOutputFormat.java | 160 ++---- .../hadoop/mapred/AccumuloInputFormat.java | 94 +++- .../mapred/AccumuloMultiTableInputFormat.java | 99 ---- .../hadoop/mapred/AccumuloOutputFormat.java | 547 +------------------- .../hadoop/mapred/AccumuloRowInputFormat.java | 82 ++- .../hadoop/mapreduce/AccumuloFileOutputFormat.java | 160 ++---- .../hadoop/mapreduce/AccumuloInputFormat.java | 95 +++- .../mapreduce/AccumuloMultiTableInputFormat.java | 94 ---- .../hadoop/mapreduce/AccumuloOutputFormat.java | 548 +-------------------- .../hadoop/mapreduce/AccumuloRowInputFormat.java | 80 ++- .../accumulo/hadoop/mapreduce/FileOutputInfo.java | 192 ++++++++ .../accumulo/hadoop/mapreduce/InputInfo.java | 363 ++++++++++++++ .../accumulo/hadoop/mapreduce/OutputInfo.java | 142 ++++++ .../mapred/AbstractInputFormat.java | 246 +-------- .../mapred/AccumuloFileOutputFormatImpl.java} | 70 +-- .../mapred/AccumuloOutputFormatImpl.java} | 235 +-------- .../mapred/InputFormatBase.java | 72 +-- .../mapred/RangeInputSplit.java | 4 +- .../mapreduce/AbstractInputFormat.java | 251 +--------- .../mapreduce/AccumuloFileOutputFormatImpl.java} | 66 +-- .../mapreduce/AccumuloOutputFormatImpl.java} | 238 +-------- .../hadoopImpl/mapreduce/BatchInputSplit.java | 1 - .../hadoopImpl/mapreduce/FileOutputInfoImpl.java | 159 ++++++ .../mapreduce/InputFormatBase.java | 77 +-- .../hadoopImpl/mapreduce/InputInfoImpl.java | 267 ++++++++++ .../mapreduce/InputTableConfig.java | 12 +- .../hadoopImpl/mapreduce/OutputInfoImpl.java | 115 +++++ .../mapreduce/RangeInputSplit.java | 37 +- .../accumulo/hadoopImpl/mapreduce/SplitUtils.java | 7 +- .../hadoopImpl/mapreduce/lib/ConfiguratorBase.java | 102 +--- .../mapreduce/lib/FileOutputConfigurator.java | 15 +- .../mapreduce/lib/InputConfigurator.java | 98 ++-- .../lib/MapReduceClientOnDefaultTable.java | 16 +- .../lib/MapReduceClientOnRequiredTable.java | 17 +- .../mapreduce/lib/MapReduceClientOpts.java | 9 +- .../hadoopImpl/mapreduce/lib/package-info.java | 39 -- .../lib/partition/KeyRangePartitioner.java | 2 +- .../mapreduce/lib/partition/RangePartitioner.java | 2 +- .../mapred/AccumuloFileOutputFormatTest.java | 22 +- .../hadoop/mapred/AccumuloInputFormatTest.java | 135 +++-- .../mapred/AccumuloMultiTableInputFormatTest.java | 69 --- .../hadoop/mapred/AccumuloOutputFormatTest.java | 18 +- .../hadoop/mapred/AccumuloRowInputFormatIT.java | 211 ++++++++ .../mapreduce/AccumuloFileOutputFormatTest.java | 23 +- .../hadoop/mapreduce/AccumuloInputFormatTest.java | 129 +++-- .../AccumuloMultiTableInputFormatTest.java | 64 --- .../hadoop/mapreduce/AccumuloOutputFormatTest.java | 18 +- .../hadoop/mapreduce/AccumuloRowInputFormatIT.java | 206 ++++++++ .../hadoop/mapreduce/NewAccumuloInputFormatIT.java | 472 ++++++++++++++++++ .../mapred/RangeInputSplitTest.java | 12 +- .../hadoopImpl/mapreduce/BatchInputSplitTest.java | 10 +- .../mapreduce/InputTableConfigTest.java | 9 +- .../mapreduce/RangeInputSplitTest.java | 12 +- .../mapreduce/lib/ConfiguratorBaseTest.java | 44 -- .../lib/partition/RangePartitionerTest.java | 2 +- .../src/test/resources/log4j.properties | 30 ++ 57 files changed, 2990 insertions(+), 3328 deletions(-) diff --git a/hadoop-mapreduce/pom.xml b/hadoop-mapreduce/pom.xml index 6e64848..8e2b9fc 100644 --- a/hadoop-mapreduce/pom.xml +++ b/hadoop-mapreduce/pom.xml @@ -35,10 +35,6 @@ guava - log4j - log4j - - org.apache.accumulo accumulo-core @@ -61,6 +57,21 @@ test + org.apache.accumulo + accumulo-minicluster + test + + + org.apache.accumulo + accumulo-test + test + + + org.easymock + easymock + test + + org.slf4j slf4j-log4j12 test diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloFileOutputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloFileOutputFormat.java index a4664d1..d44219d 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloFileOutputFormat.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloFileOutputFormat.java @@ -16,17 +16,24 @@ */ package org.apache.accumulo.hadoop.mapred; +import static org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setCompressionType; +import static org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setDataBlockSize; +import static org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setFileBlockSize; +import static org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setIndexBlockSize; +import static org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setReplication; +import static org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setSampler; +import static org.apache.accumulo.hadoopImpl.mapred.AccumuloFileOutputFormatImpl.setSummarizers; + import java.io.IOException; import org.apache.accumulo.core.client.rfile.RFile; import org.apache.accumulo.core.client.rfile.RFileWriter; -import org.apache.accumulo.core.client.sample.SamplerConfiguration; -import org.apache.accumulo.core.client.summary.Summarizer; import org.apache.accumulo.core.client.summary.SummarizerConfiguration; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.hadoop.mapreduce.FileOutputInfo; import org.apache.accumulo.hadoopImpl.mapreduce.lib.ConfiguratorBase; import org.apache.accumulo.hadoopImpl.mapreduce.lib.FileOutputConfigurator; import org.apache.hadoop.conf.Configuration; @@ -37,7 +44,6 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.Progressable; -import org.apache.log4j.Logger; /** * This class allows MapReduce jobs to write output in the Accumulo data file format.
@@ -45,130 +51,25 @@ import org.apache.log4j.Logger; * requirement of Accumulo data files. * *

- * The output path to be created must be specified via - * {@link AccumuloFileOutputFormat#setOutputPath(JobConf, Path)}. This is inherited from - * {@link FileOutputFormat#setOutputPath(JobConf, Path)}. Other methods from - * {@link FileOutputFormat} are not supported and may be ignored or cause failures. Using other - * Hadoop configuration options that affect the behavior of the underlying files directly in the - * Job's configuration may work, but are not directly supported at this time. + * The output path to be created must be specified via {@link #setInfo(JobConf, FileOutputInfo)} + * using {@link FileOutputInfo#builder()}.outputPath(path). For all available options see + * {@link FileOutputInfo#builder()} + *

+ * Methods inherited from {@link FileOutputFormat} are not supported and may be ignored or cause + * failures. Using other Hadoop configuration options that affect the behavior of the underlying + * files directly in the Job's configuration may work, but are not directly supported at this time. + * + * @since 2.0 */ public class AccumuloFileOutputFormat extends FileOutputFormat { - private static final Class CLASS = AccumuloFileOutputFormat.class; - protected static final Logger log = Logger.getLogger(CLASS); - - /** - * Sets the compression type to use for data blocks. Specifying a compression may require - * additional libraries to be available to your Job. - * - * @param job - * the Hadoop job instance to be configured - * @param compressionType - * one of "none", "gz", "lzo", or "snappy" - * @since 1.5.0 - */ - public static void setCompressionType(JobConf job, String compressionType) { - FileOutputConfigurator.setCompressionType(CLASS, job, compressionType); - } - - /** - * Sets the size for data blocks within each file.
- * Data blocks are a span of key/value pairs stored in the file that are compressed and indexed as - * a group. - * - *

- * Making this value smaller may increase seek performance, but at the cost of increasing the size - * of the indexes (which can also affect seek performance). - * - * @param job - * the Hadoop job instance to be configured - * @param dataBlockSize - * the block size, in bytes - * @since 1.5.0 - */ - public static void setDataBlockSize(JobConf job, long dataBlockSize) { - FileOutputConfigurator.setDataBlockSize(CLASS, job, dataBlockSize); - } - - /** - * Sets the size for file blocks in the file system; file blocks are managed, and replicated, by - * the underlying file system. - * - * @param job - * the Hadoop job instance to be configured - * @param fileBlockSize - * the block size, in bytes - * @since 1.5.0 - */ - public static void setFileBlockSize(JobConf job, long fileBlockSize) { - FileOutputConfigurator.setFileBlockSize(CLASS, job, fileBlockSize); - } - - /** - * Sets the size for index blocks within each file; smaller blocks means a deeper index hierarchy - * within the file, while larger blocks mean a more shallow index hierarchy within the file. This - * can affect the performance of queries. - * - * @param job - * the Hadoop job instance to be configured - * @param indexBlockSize - * the block size, in bytes - * @since 1.5.0 - */ - public static void setIndexBlockSize(JobConf job, long indexBlockSize) { - FileOutputConfigurator.setIndexBlockSize(CLASS, job, indexBlockSize); - } - - /** - * Sets the file system replication factor for the resulting file, overriding the file system - * default. - * - * @param job - * the Hadoop job instance to be configured - * @param replication - * the number of replicas for produced files - * @since 1.5.0 - */ - public static void setReplication(JobConf job, int replication) { - FileOutputConfigurator.setReplication(CLASS, job, replication); - } - - /** - * Specify a sampler to be used when writing out data. This will result in the output file having - * sample data. - * - * @param job - * The Hadoop job instance to be configured - * @param samplerConfig - * The configuration for creating sample data in the output file. - * @since 1.8.0 - */ - - public static void setSampler(JobConf job, SamplerConfiguration samplerConfig) { - FileOutputConfigurator.setSampler(CLASS, job, samplerConfig); - } - - /** - * Specifies a list of summarizer configurations to create summary data in the output file. Each - * Key Value written will be passed to the configured {@link Summarizer}'s. - * - * @param job - * The Hadoop job instance to be configured - * @param summarizerConfigs - * summarizer configurations - * @since 2.0.0 - */ - public static void setSummarizers(JobConf job, SummarizerConfiguration... summarizerConfigs) { - FileOutputConfigurator.setSummarizers(CLASS, job, summarizerConfigs); - } - @Override public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException { // get the path of the temporary output file final Configuration conf = job; - final AccumuloConfiguration acuConf = FileOutputConfigurator.getAccumuloConfiguration(CLASS, - job); + final AccumuloConfiguration acuConf = FileOutputConfigurator + .getAccumuloConfiguration(AccumuloFileOutputFormat.class, job); final String extension = acuConf.get(Property.TABLE_FILE_TYPE); final Path file = new Path(getWorkOutputPath(job), @@ -196,4 +97,25 @@ public class AccumuloFileOutputFormat extends FileOutputFormat { }; } + /** + * Sets all the information required for this map reduce job. + */ + public static void setInfo(JobConf job, FileOutputInfo info) { + setOutputPath(job, info.getOutputPath()); + if (info.getCompressionType().isPresent()) + setCompressionType(job, info.getCompressionType().get()); + if (info.getDataBlockSize().isPresent()) + setDataBlockSize(job, info.getDataBlockSize().get()); + if (info.getFileBlockSize().isPresent()) + setFileBlockSize(job, info.getFileBlockSize().get()); + if (info.getIndexBlockSize().isPresent()) + setIndexBlockSize(job, info.getIndexBlockSize().get()); + if (info.getReplication().isPresent()) + setReplication(job, info.getReplication().get()); + if (info.getSampler().isPresent()) + setSampler(job, info.getSampler().get()); + if (info.getSummarizers().size() > 0) + setSummarizers(job, info.getSummarizers().toArray(new SummarizerConfiguration[0])); + } + } diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormat.java index 8a7e6a9..28629c3 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormat.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormat.java @@ -16,54 +16,73 @@ */ package org.apache.accumulo.hadoop.mapred; +import static org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setClassLoaderContext; +import static org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setClientInfo; +import static org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setScanAuthorizations; +import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setAutoAdjustRanges; +import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setBatchScan; +import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setExecutionHints; +import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setInputTableName; +import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setLocalIterators; +import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setOfflineTableScan; +import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setRanges; +import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setSamplerConfiguration; +import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setScanIsolation; + import java.io.IOException; import java.util.Map.Entry; -import org.apache.accumulo.core.client.ClientInfo; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.format.DefaultFormatter; -import org.apache.accumulo.hadoop.mapreduce.RangeInputSplit; +import org.apache.accumulo.hadoop.mapreduce.InputInfo; +import org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat; +import org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.RecordReaderBase; +import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; -import org.apache.log4j.Level; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * This class allows MapReduce jobs to use Accumulo as the source of data. This {@link InputFormat} - * provides keys and values of type {@link Key} and {@link Value} to the Map function. + * This class allows MapReduce jobs to use Accumulo as the source of data. This + * {@link org.apache.hadoop.mapred.InputFormat} provides keys and values of type {@link Key} and + * {@link Value} to the Map function. * - * The user must specify the following via static configurator methods: + * The user must specify the following via static configurator method: * *

    - *
  • {@link AccumuloInputFormat#setClientInfo(JobConf, ClientInfo)} - *
  • {@link AccumuloInputFormat#setInputTableName(JobConf, String)}
  • - *
  • {@link AccumuloInputFormat#setScanAuthorizations(JobConf, Authorizations)} + *
  • {@link AccumuloInputFormat#setInfo(JobConf, InputInfo)} *
* - * Other static methods are optional. + * For required parameters and all available options use {@link InputInfo#builder()} + * + * @since 2.0 */ -public class AccumuloInputFormat extends InputFormatBase { +public class AccumuloInputFormat implements InputFormat { + private static Class CLASS = AccumuloInputFormat.class; + private static Logger log = LoggerFactory.getLogger(CLASS); + + /** + * Gets the splits of the tables that have been set on the job by reading the metadata table for + * the specified ranges. + * + * @return the splits from the tables based on the ranges. + * @throws java.io.IOException + * if a table set on the job doesn't exist or an error occurs initializing the tablet + * locator + */ + @Override + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + return AbstractInputFormat.getSplits(job, numSplits); + } @Override public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { - log.setLevel(getLogLevel(job)); - - // Override the log level from the configuration as if the RangeInputSplit has one it's the more - // correct one to use. - if (split instanceof RangeInputSplit) { - RangeInputSplit accSplit = (RangeInputSplit) split; - Level level = accSplit.getLogLevel(); - if (null != level) { - log.setLevel(level); - } - } else { - throw new IllegalArgumentException("No RecordReader for " + split.getClass()); - } RecordReaderBase recordReader = new RecordReaderBase() { @@ -95,4 +114,29 @@ public class AccumuloInputFormat extends InputFormatBase { recordReader.initialize(split, job); return recordReader; } + + public static void setInfo(JobConf job, InputInfo info) { + setClientInfo(job, info.getClientInfo()); + setScanAuthorizations(job, info.getScanAuths()); + setInputTableName(job, info.getTableName()); + + // all optional values + if (info.getContext().isPresent()) + setClassLoaderContext(job, info.getContext().get()); + if (info.getRanges().size() > 0) + setRanges(job, info.getRanges()); + if (info.getIterators().size() > 0) + InputConfigurator.writeIteratorsToConf(CLASS, job, info.getIterators()); + if (info.getFetchColumns().size() > 0) + InputConfigurator.fetchColumns(CLASS, job, info.getFetchColumns()); + if (info.getSamplerConfig().isPresent()) + setSamplerConfiguration(job, info.getSamplerConfig().get()); + if (info.getExecutionHints().size() > 0) + setExecutionHints(job, info.getExecutionHints()); + setAutoAdjustRanges(job, info.isAutoAdjustRanges()); + setScanIsolation(job, info.isScanIsolation()); + setLocalIterators(job, info.isLocalIterators()); + setOfflineTableScan(job, info.isOfflineScan()); + setBatchScan(job, info.isBatchScan()); + } } diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloMultiTableInputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloMultiTableInputFormat.java deleted file mode 100644 index e34d307..0000000 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloMultiTableInputFormat.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.hadoop.mapred; - -import java.io.IOException; -import java.util.Map; - -import org.apache.accumulo.core.client.ClientInfo; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.util.format.DefaultFormatter; -import org.apache.accumulo.hadoop.mapred.InputFormatBase.RecordReaderBase; -import org.apache.accumulo.hadoop.mapreduce.InputTableConfig; -import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; - -/** - * This class allows MapReduce jobs to use multiple Accumulo tables as the source of data. This - * {@link org.apache.hadoop.mapred.InputFormat} provides keys and values of type {@link Key} and - * {@link Value} to the Map function. - * - * The user must specify the following via static configurator methods: - * - *
    - *
  • {@link AccumuloInputFormat#setClientInfo(JobConf, ClientInfo)} - *
  • {@link AccumuloInputFormat#setScanAuthorizations(JobConf, org.apache.accumulo.core.security.Authorizations)} - *
  • {@link AccumuloMultiTableInputFormat#setInputTableConfigs(org.apache.hadoop.mapred.JobConf, java.util.Map)} - *
- * - * Other static methods are optional. - */ - -public class AccumuloMultiTableInputFormat extends AbstractInputFormat { - - /** - * Sets the {@link InputTableConfig} objects on the given Hadoop configuration - * - * @param job - * the Hadoop job instance to be configured - * @param configs - * the table query configs to be set on the configuration. - * @since 1.6.0 - */ - public static void setInputTableConfigs(JobConf job, Map configs) { - InputConfigurator.setInputTableConfigs(CLASS, job, configs); - } - - @Override - public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) - throws IOException { - log.setLevel(getLogLevel(job)); - RecordReaderBase recordReader = new RecordReaderBase() { - - @Override - public boolean next(Key key, Value value) throws IOException { - if (scannerIterator.hasNext()) { - ++numKeysRead; - Map.Entry entry = scannerIterator.next(); - key.set(currentKey = entry.getKey()); - value.set(entry.getValue().get()); - if (log.isTraceEnabled()) - log.trace("Processing key/value pair: " + DefaultFormatter.formatEntry(entry, true)); - return true; - } - return false; - } - - @Override - public Key createKey() { - return new Key(); - } - - @Override - public Value createValue() { - return new Value(); - } - - }; - recordReader.initialize(split, job); - return recordReader; - } -} diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormat.java index da8fa1b..9fb903a 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormat.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormat.java @@ -16,557 +16,54 @@ */ package org.apache.accumulo.hadoop.mapred; +import static org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl.getClientInfo; +import static org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl.setBatchWriterOptions; +import static org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl.setClientInfo; +import static org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl.setCreateTables; +import static org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl.setDefaultTableName; +import static org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl.setSimulationMode; + import java.io.IOException; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map.Entry; -import java.util.Set; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.ClientInfo; -import org.apache.accumulo.core.client.MultiTableBatchWriter; -import org.apache.accumulo.core.client.MutationsRejectedException; -import org.apache.accumulo.core.client.TableExistsException; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.admin.DelegationTokenConfig; -import org.apache.accumulo.core.client.admin.SecurityOperations; -import org.apache.accumulo.core.client.impl.AuthenticationTokenIdentifier; -import org.apache.accumulo.core.client.impl.DelegationTokenImpl; -import org.apache.accumulo.core.client.security.SecurityErrorCode; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; -import org.apache.accumulo.core.client.security.tokens.DelegationToken; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.data.ColumnUpdate; import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.TabletId; -import org.apache.accumulo.core.security.ColumnVisibility; -import org.apache.accumulo.hadoopImpl.mapreduce.lib.ConfiguratorBase; -import org.apache.accumulo.hadoopImpl.mapreduce.lib.OutputConfigurator; +import org.apache.accumulo.hadoop.mapreduce.OutputInfo; +import org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; import org.apache.hadoop.mapred.RecordWriter; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Progressable; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; /** * This class allows MapReduce jobs to use Accumulo as the sink for data. This {@link OutputFormat} * accepts keys and values of type {@link Text} (for a table name) and {@link Mutation} from the Map * and Reduce functions. * - * The user must specify the following via static configurator methods: + * The user must specify the following via static configurator method: * *
    - *
  • {@link AccumuloOutputFormat#setClientInfo(JobConf, ClientInfo)} + *
  • {@link AccumuloOutputFormat#setInfo(JobConf, OutputInfo)} *
* - * Other static methods are optional. + * @since 2.0 */ public class AccumuloOutputFormat implements OutputFormat { - private static final Class CLASS = AccumuloOutputFormat.class; - protected static final Logger log = Logger.getLogger(CLASS); - - /** - * Set the connection information needed to communicate with Accumulo in this job. - * - * @param job - * Hadoop job to be configured - * @param info - * Accumulo connection information - * @since 2.0.0 - */ - public static void setClientInfo(JobConf job, ClientInfo info) { - ClientInfo outInfo = OutputConfigurator.updateToken(job.getCredentials(), info); - OutputConfigurator.setClientInfo(CLASS, job, outInfo); - } - - /** - * Get the connection information needed to communication with Accumulo - * - * @param job - * Hadoop job to be configured - * @since 2.0.0 - */ - protected static ClientInfo getClientInfo(JobConf job) { - return OutputConfigurator.getClientInfo(CLASS, job); - } - - /** - * Set Accumulo client properties file used to connect to Accumulo - * - * @param job - * Hadoop job to be configured - * @param clientPropsFile - * URL (hdfs:// or http://) to Accumulo client properties file - * @since 2.0.0 - */ - public static void setClientPropertiesFile(JobConf job, String clientPropsFile) { - OutputConfigurator.setClientPropertiesFile(CLASS, job, clientPropsFile); - } - - /** - * Sets the connector information needed to communicate with Accumulo in this job. - * - *

- * WARNING: Some tokens, when serialized, divulge sensitive information in the - * configuration as a means to pass the token to MapReduce tasks. This information is BASE64 - * encoded to provide a charset safe conversion to a string, but this conversion is not intended - * to be secure. {@link PasswordToken} is one example that is insecure in this way; however - * {@link DelegationToken}s, acquired using - * {@link SecurityOperations#getDelegationToken(DelegationTokenConfig)}, is not subject to this - * concern. - * - * @param job - * the Hadoop job instance to be configured - * @param principal - * a valid Accumulo user name (user must have Table.CREATE permission if - * {@link #setCreateTables(JobConf, boolean)} is set to true) - * @param token - * the user's password - * @since 1.5.0 - * @deprecated since 2.0.0, use {@link #setClientInfo(JobConf, ClientInfo)} instead. - */ - @Deprecated - public static void setConnectorInfo(JobConf job, String principal, AuthenticationToken token) - throws AccumuloSecurityException { - // DelegationTokens can be passed securely from user to task without serializing insecurely in - // the configuration - if (token instanceof DelegationTokenImpl) { - DelegationTokenImpl delegationToken = (DelegationTokenImpl) token; - - // Convert it into a Hadoop Token - AuthenticationTokenIdentifier identifier = delegationToken.getIdentifier(); - Token hadoopToken = new Token<>(identifier.getBytes(), - delegationToken.getPassword(), identifier.getKind(), delegationToken.getServiceName()); - - // Add the Hadoop Token to the Job so it gets serialized and passed along. - job.getCredentials().addToken(hadoopToken.getService(), hadoopToken); - } - - OutputConfigurator.setConnectorInfo(CLASS, job, principal, token); - } - - /** - * Sets the connector information needed to communicate with Accumulo in this job. - * - *

- * Stores the password in a file in HDFS and pulls that into the Distributed Cache in an attempt - * to be more secure than storing it in the Configuration. - * - * @param job - * the Hadoop job instance to be configured - * @param principal - * a valid Accumulo user name (user must have Table.CREATE permission if - * {@link #setCreateTables(JobConf, boolean)} is set to true) - * @param tokenFile - * the path to the password file - * @since 1.6.0 - * @deprecated since 2.0.0, use {@link #setClientPropertiesFile(JobConf, String)} instead - */ - @Deprecated - public static void setConnectorInfo(JobConf job, String principal, String tokenFile) - throws AccumuloSecurityException { - setClientPropertiesFile(job, tokenFile); - } - - /** - * Determines if the connector has been configured. - * - * @param job - * the Hadoop context for the configured job - * @return true if the connector has been configured, false otherwise - * @since 1.5.0 - * @see #setConnectorInfo(JobConf, String, AuthenticationToken) - */ - protected static Boolean isConnectorInfoSet(JobConf job) { - return OutputConfigurator.isConnectorInfoSet(CLASS, job); - } - - /** - * Gets the principal from the configuration. - * - * @param job - * the Hadoop context for the configured job - * @return the user name - * @since 1.5.0 - * @see #setConnectorInfo(JobConf, String, AuthenticationToken) - */ - protected static String getPrincipal(JobConf job) { - return OutputConfigurator.getPrincipal(CLASS, job); - } - - /** - * Gets the authenticated token from either the specified token file or directly from the - * configuration, whichever was used when the job was configured. - * - * @param job - * the Hadoop job instance to be configured - * @return the principal's authentication token - * @since 1.6.0 - * @see #setConnectorInfo(JobConf, String, AuthenticationToken) - * @see #setConnectorInfo(JobConf, String, String) - */ - protected static AuthenticationToken getAuthenticationToken(JobConf job) { - AuthenticationToken token = OutputConfigurator.getAuthenticationToken(CLASS, job); - return ConfiguratorBase.unwrapAuthenticationToken(job, token); - } - - /** - * Configures a {@link org.apache.accumulo.core.client.ZooKeeperInstance} for this job. - * - * @param job - * the Hadoop job instance to be configured - * - * @param clientConfig - * client configuration for specifying connection timeouts, SSL connection options, etc. - * @since 1.6.0 - * @deprecated since 2.0.0; Use {@link #setClientInfo(JobConf, ClientInfo)} instead. - */ - @Deprecated - public static void setZooKeeperInstance(JobConf job, - org.apache.accumulo.core.client.ClientConfiguration clientConfig) { - OutputConfigurator.setZooKeeperInstance(CLASS, job, clientConfig); - } - - /** - * Initializes an Accumulo {@link org.apache.accumulo.core.client.Instance} based on the - * configuration. - * - * @param job - * the Hadoop context for the configured job - * @return an Accumulo instance - * @since 1.5.0 - * @deprecated since 2.0.0; Use {@link #getClientInfo(JobConf)} instead - */ - @Deprecated - protected static org.apache.accumulo.core.client.Instance getInstance(JobConf job) { - return OutputConfigurator.getInstance(CLASS, job); - } - - /** - * Sets the log level for this job. - * - * @param job - * the Hadoop job instance to be configured - * @param level - * the logging level - * @since 1.5.0 - */ - public static void setLogLevel(JobConf job, Level level) { - OutputConfigurator.setLogLevel(CLASS, job, level); - } - - /** - * Gets the log level from this configuration. - * - * @param job - * the Hadoop context for the configured job - * @return the log level - * @since 1.5.0 - * @see #setLogLevel(JobConf, Level) - */ - protected static Level getLogLevel(JobConf job) { - return OutputConfigurator.getLogLevel(CLASS, job); - } - - /** - * Sets the default table name to use if one emits a null in place of a table name for a given - * mutation. Table names can only be alpha-numeric and underscores. - * - * @param job - * the Hadoop job instance to be configured - * @param tableName - * the table to use when the tablename is null in the write call - * @since 1.5.0 - */ - public static void setDefaultTableName(JobConf job, String tableName) { - OutputConfigurator.setDefaultTableName(CLASS, job, tableName); - } - - /** - * Gets the default table name from the configuration. - * - * @param job - * the Hadoop context for the configured job - * @return the default table name - * @since 1.5.0 - * @see #setDefaultTableName(JobConf, String) - */ - protected static String getDefaultTableName(JobConf job) { - return OutputConfigurator.getDefaultTableName(CLASS, job); - } - - /** - * Sets the configuration for for the job's {@link BatchWriter} instances. If not set, a new - * {@link BatchWriterConfig}, with sensible built-in defaults is used. Setting the configuration - * multiple times overwrites any previous configuration. - * - * @param job - * the Hadoop job instance to be configured - * @param bwConfig - * the configuration for the {@link BatchWriter} - * @since 1.5.0 - */ - public static void setBatchWriterOptions(JobConf job, BatchWriterConfig bwConfig) { - OutputConfigurator.setBatchWriterOptions(CLASS, job, bwConfig); - } - - /** - * Gets the {@link BatchWriterConfig} settings. - * - * @param job - * the Hadoop context for the configured job - * @return the configuration object - * @since 1.5.0 - * @see #setBatchWriterOptions(JobConf, BatchWriterConfig) - */ - protected static BatchWriterConfig getBatchWriterOptions(JobConf job) { - return OutputConfigurator.getBatchWriterOptions(CLASS, job); - } - - /** - * Sets the directive to create new tables, as necessary. Table names can only be alpha-numeric - * and underscores. - * - *

- * By default, this feature is disabled. - * - * @param job - * the Hadoop job instance to be configured - * @param enableFeature - * the feature is enabled if true, disabled otherwise - * @since 1.5.0 - */ - public static void setCreateTables(JobConf job, boolean enableFeature) { - OutputConfigurator.setCreateTables(CLASS, job, enableFeature); - } - - /** - * Determines whether tables are permitted to be created as needed. - * - * @param job - * the Hadoop context for the configured job - * @return true if the feature is disabled, false otherwise - * @since 1.5.0 - * @see #setCreateTables(JobConf, boolean) - */ - protected static Boolean canCreateTables(JobConf job) { - return OutputConfigurator.canCreateTables(CLASS, job); - } - - /** - * Sets the directive to use simulation mode for this job. In simulation mode, no output is - * produced. This is useful for testing. - * - *

- * By default, this feature is disabled. - * - * @param job - * the Hadoop job instance to be configured - * @param enableFeature - * the feature is enabled if true, disabled otherwise - * @since 1.5.0 - */ - public static void setSimulationMode(JobConf job, boolean enableFeature) { - OutputConfigurator.setSimulationMode(CLASS, job, enableFeature); - } - - /** - * Determines whether this feature is enabled. - * - * @param job - * the Hadoop context for the configured job - * @return true if the feature is enabled, false otherwise - * @since 1.5.0 - * @see #setSimulationMode(JobConf, boolean) - */ - protected static Boolean getSimulationMode(JobConf job) { - return OutputConfigurator.getSimulationMode(CLASS, job); - } - - /** - * A base class to be used to create {@link RecordWriter} instances that write to Accumulo. - */ - protected static class AccumuloRecordWriter implements RecordWriter { - private MultiTableBatchWriter mtbw = null; - private HashMap bws = null; - private Text defaultTableName = null; - - private boolean simulate = false; - private boolean createTables = false; - - private long mutCount = 0; - private long valCount = 0; - - private AccumuloClient client; - - protected AccumuloRecordWriter(JobConf job) - throws AccumuloException, AccumuloSecurityException, IOException { - Level l = getLogLevel(job); - if (l != null) - log.setLevel(getLogLevel(job)); - this.simulate = getSimulationMode(job); - this.createTables = canCreateTables(job); - - if (simulate) - log.info("Simulating output only. No writes to tables will occur"); - - this.bws = new HashMap<>(); - - String tname = getDefaultTableName(job); - this.defaultTableName = (tname == null) ? null : new Text(tname); - - if (!simulate) { - this.client = Accumulo.newClient().usingClientInfo(getClientInfo(job)).build(); - mtbw = client.createMultiTableBatchWriter(getBatchWriterOptions(job)); - } - } - - /** - * Push a mutation into a table. If table is null, the defaultTable will be used. If - * {@link AccumuloOutputFormat#canCreateTables(JobConf)} is set, the table will be created if it - * does not exist. The table name must only contain alphanumerics and underscore. - */ - @Override - public void write(Text table, Mutation mutation) throws IOException { - if (table == null || table.toString().isEmpty()) - table = this.defaultTableName; - - if (!simulate && table == null) - throw new IOException("No table or default table specified. Try simulation mode next time"); - - ++mutCount; - valCount += mutation.size(); - printMutation(table, mutation); - - if (simulate) - return; - - if (!bws.containsKey(table)) - try { - addTable(table); - } catch (final Exception e) { - log.error("Could not add table '" + table + "'", e); - throw new IOException(e); - } - - try { - bws.get(table).addMutation(mutation); - } catch (MutationsRejectedException e) { - throw new IOException(e); - } - } - - public void addTable(Text tableName) throws AccumuloException, AccumuloSecurityException { - if (simulate) { - log.info("Simulating adding table: " + tableName); - return; - } - - log.debug("Adding table: " + tableName); - BatchWriter bw = null; - String table = tableName.toString(); - - if (createTables && !client.tableOperations().exists(table)) { - try { - client.tableOperations().create(table); - } catch (AccumuloSecurityException e) { - log.error("Accumulo security violation creating " + table, e); - throw e; - } catch (TableExistsException e) { - // Shouldn't happen - } - } - - try { - bw = mtbw.getBatchWriter(table); - } catch (TableNotFoundException e) { - log.error("Accumulo table " + table + " doesn't exist and cannot be created.", e); - throw new AccumuloException(e); - } catch (AccumuloException | AccumuloSecurityException e) { - throw e; - } - - if (bw != null) - bws.put(tableName, bw); - } - - private int printMutation(Text table, Mutation m) { - if (log.isTraceEnabled()) { - log.trace(String.format("Table %s row key: %s", table, hexDump(m.getRow()))); - for (ColumnUpdate cu : m.getUpdates()) { - log.trace(String.format("Table %s column: %s:%s", table, hexDump(cu.getColumnFamily()), - hexDump(cu.getColumnQualifier()))); - log.trace(String.format("Table %s security: %s", table, - new ColumnVisibility(cu.getColumnVisibility()).toString())); - log.trace(String.format("Table %s value: %s", table, hexDump(cu.getValue()))); - } - } - return m.getUpdates().size(); - } - - private String hexDump(byte[] ba) { - StringBuilder sb = new StringBuilder(); - for (byte b : ba) { - if ((b > 0x20) && (b < 0x7e)) - sb.append((char) b); - else - sb.append(String.format("x%02x", b)); - } - return sb.toString(); - } - - @Override - public void close(Reporter reporter) throws IOException { - log.debug("mutations written: " + mutCount + ", values written: " + valCount); - if (simulate) - return; - - try { - mtbw.close(); - } catch (MutationsRejectedException e) { - if (e.getSecurityErrorCodes().size() >= 0) { - HashMap> tables = new HashMap<>(); - for (Entry> ke : e.getSecurityErrorCodes().entrySet()) { - String tableId = ke.getKey().getTableId().toString(); - Set secCodes = tables.get(tableId); - if (secCodes == null) { - secCodes = new HashSet<>(); - tables.put(tableId, secCodes); - } - secCodes.addAll(ke.getValue()); - } - - log.error("Not authorized to write to tables : " + tables); - } - - if (e.getConstraintViolationSummaries().size() > 0) { - log.error("Constraint violations : " + e.getConstraintViolationSummaries().size()); - } - throw new IOException(e); - } - } - } - @Override public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException { - if (!isConnectorInfoSet(job)) - throw new IOException("Connector info has not been set."); try { // if the instance isn't configured, it will complain here - AccumuloClient c = Accumulo.newClient().usingClientInfo(getClientInfo(job)).build(); - String principal = getPrincipal(job); - AuthenticationToken token = getAuthenticationToken(job); + ClientInfo clientInfo = getClientInfo(job); + String principal = clientInfo.getPrincipal(); + AuthenticationToken token = clientInfo.getAuthenticationToken(); + AccumuloClient c = Accumulo.newClient().usingClientInfo(clientInfo).build(); if (!c.securityOperations().authenticateUser(principal, token)) throw new IOException("Unable to authenticate user"); } catch (AccumuloException | AccumuloSecurityException e) { @@ -578,10 +75,20 @@ public class AccumuloOutputFormat implements OutputFormat { public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException { try { - return new AccumuloRecordWriter(job); + return new AccumuloOutputFormatImpl.AccumuloRecordWriter(job); } catch (Exception e) { throw new IOException(e); } } + public static void setInfo(JobConf job, OutputInfo info) { + setClientInfo(job, info.getClientInfo()); + if (info.getBatchWriterOptions().isPresent()) + setBatchWriterOptions(job, info.getBatchWriterOptions().get()); + if (info.getDefaultTableName().isPresent()) + setDefaultTableName(job, info.getDefaultTableName().get()); + setCreateTables(job, info.isCreateTables()); + setSimulationMode(job, info.isSimulationMode()); + } + } diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloRowInputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloRowInputFormat.java index c9e4a21..e823d8a 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloRowInputFormat.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloRowInputFormat.java @@ -16,15 +16,31 @@ */ package org.apache.accumulo.hadoop.mapred; +import static org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setClassLoaderContext; +import static org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setClientInfo; +import static org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setScanAuthorizations; +import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.fetchColumns; +import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setAutoAdjustRanges; +import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setBatchScan; +import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setExecutionHints; +import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setInputTableName; +import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setLocalIterators; +import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setOfflineTableScan; +import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setRanges; +import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setSamplerConfiguration; +import static org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setScanIsolation; + import java.io.IOException; import java.util.Map.Entry; -import org.apache.accumulo.core.client.ClientInfo; import org.apache.accumulo.core.client.RowIterator; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.PeekingIterator; +import org.apache.accumulo.hadoop.mapreduce.InputInfo; +import org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat; +import org.apache.accumulo.hadoopImpl.mapred.InputFormatBase; +import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; @@ -38,25 +54,38 @@ import org.apache.hadoop.mapred.Reporter; * value, which in turn makes the {@link Key}/{@link Value} pairs for that row available to the Map * function. * - * The user must specify the following via static configurator methods: + * The user must specify the following via static configurator method: * *

    - *
  • {@link AccumuloRowInputFormat#setClientInfo(JobConf, ClientInfo)} - *
  • {@link AccumuloRowInputFormat#setInputTableName(JobConf, String)} - *
  • {@link AccumuloRowInputFormat#setScanAuthorizations(JobConf, Authorizations)} + *
  • {@link AccumuloRowInputFormat#setInfo(JobConf, InputInfo)} *
* - * Other static methods are optional. + * For required parameters and all available options use {@link InputInfo#builder()} + * + * @since 2.0 */ -public class AccumuloRowInputFormat - extends InputFormatBase>> { +public class AccumuloRowInputFormat implements InputFormat>> { + + /** + * Gets the splits of the tables that have been set on the job by reading the metadata table for + * the specified ranges. + * + * @return the splits from the tables based on the ranges. + * @throws java.io.IOException + * if a table set on the job doesn't exist or an error occurs initializing the tablet + * locator + */ + @Override + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + return AbstractInputFormat.getSplits(job, numSplits); + } + @Override public RecordReader>> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { - log.setLevel(getLogLevel(job)); // @formatter:off - RecordReaderBase>> recordReader = - new RecordReaderBase>>() { + InputFormatBase.RecordReaderBase>> recordReader = + new InputFormatBase.RecordReaderBase>>() { // @formatter:on RowIterator rowIterator; @@ -90,4 +119,33 @@ public class AccumuloRowInputFormat recordReader.initialize(split, job); return recordReader; } + + /** + * Sets all the information required for this map reduce job. + */ + public static void setInfo(JobConf job, InputInfo info) { + setClientInfo(job, info.getClientInfo()); + setScanAuthorizations(job, info.getScanAuths()); + setInputTableName(job, info.getTableName()); + + // all optional values + if (info.getContext().isPresent()) + setClassLoaderContext(job, info.getContext().get()); + if (info.getRanges().size() > 0) + setRanges(job, info.getRanges()); + if (info.getIterators().size() > 0) + InputConfigurator.writeIteratorsToConf(AccumuloRowInputFormat.class, job, + info.getIterators()); + if (info.getFetchColumns().size() > 0) + fetchColumns(job, info.getFetchColumns()); + if (info.getSamplerConfig().isPresent()) + setSamplerConfiguration(job, info.getSamplerConfig().get()); + if (info.getExecutionHints().size() > 0) + setExecutionHints(job, info.getExecutionHints()); + setAutoAdjustRanges(job, info.isAutoAdjustRanges()); + setScanIsolation(job, info.isScanIsolation()); + setLocalIterators(job, info.isLocalIterators()); + setOfflineTableScan(job, info.isOfflineScan()); + setBatchScan(job, info.isBatchScan()); + } } diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloFileOutputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloFileOutputFormat.java index a47c5aa..26f559e 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloFileOutputFormat.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloFileOutputFormat.java @@ -16,12 +16,18 @@ */ package org.apache.accumulo.hadoop.mapreduce; +import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setCompressionType; +import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setDataBlockSize; +import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setFileBlockSize; +import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setIndexBlockSize; +import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setReplication; +import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setSampler; +import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloFileOutputFormatImpl.setSummarizers; + import java.io.IOException; import org.apache.accumulo.core.client.rfile.RFile; import org.apache.accumulo.core.client.rfile.RFileWriter; -import org.apache.accumulo.core.client.sample.SamplerConfiguration; -import org.apache.accumulo.core.client.summary.Summarizer; import org.apache.accumulo.core.client.summary.SummarizerConfiguration; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; @@ -35,7 +41,6 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.log4j.Logger; /** * This class allows MapReduce jobs to write output in the Accumulo data file format.
@@ -43,129 +48,24 @@ import org.apache.log4j.Logger; * requirement of Accumulo data files. * *

- * The output path to be created must be specified via - * {@link AccumuloFileOutputFormat#setOutputPath(Job, Path)}. This is inherited from - * {@link FileOutputFormat#setOutputPath(Job, Path)}. Other methods from {@link FileOutputFormat} - * are not supported and may be ignored or cause failures. Using other Hadoop configuration options - * that affect the behavior of the underlying files directly in the Job's configuration may work, - * but are not directly supported at this time. + * The output path to be created must be specified via {@link #setInfo(Job, FileOutputInfo)} using + * {@link FileOutputInfo#builder()}.outputPath(path). For all available options see + * {@link FileOutputInfo#builder()} + *

+ * Methods inherited from {@link FileOutputFormat} are not supported and may be ignored or cause + * failures. Using other Hadoop configuration options that affect the behavior of the underlying + * files directly in the Job's configuration may work, but are not directly supported at this time. + * + * @since 2.0 */ public class AccumuloFileOutputFormat extends FileOutputFormat { - private static final Class CLASS = AccumuloFileOutputFormat.class; - protected static final Logger log = Logger.getLogger(CLASS); - - /** - * Sets the compression type to use for data blocks. Specifying a compression may require - * additional libraries to be available to your Job. - * - * @param job - * the Hadoop job instance to be configured - * @param compressionType - * one of "none", "gz", "lzo", or "snappy" - * @since 1.5.0 - */ - public static void setCompressionType(Job job, String compressionType) { - FileOutputConfigurator.setCompressionType(CLASS, job.getConfiguration(), compressionType); - } - - /** - * Sets the size for data blocks within each file.
- * Data blocks are a span of key/value pairs stored in the file that are compressed and indexed as - * a group. - * - *

- * Making this value smaller may increase seek performance, but at the cost of increasing the size - * of the indexes (which can also affect seek performance). - * - * @param job - * the Hadoop job instance to be configured - * @param dataBlockSize - * the block size, in bytes - * @since 1.5.0 - */ - public static void setDataBlockSize(Job job, long dataBlockSize) { - FileOutputConfigurator.setDataBlockSize(CLASS, job.getConfiguration(), dataBlockSize); - } - - /** - * Sets the size for file blocks in the file system; file blocks are managed, and replicated, by - * the underlying file system. - * - * @param job - * the Hadoop job instance to be configured - * @param fileBlockSize - * the block size, in bytes - * @since 1.5.0 - */ - public static void setFileBlockSize(Job job, long fileBlockSize) { - FileOutputConfigurator.setFileBlockSize(CLASS, job.getConfiguration(), fileBlockSize); - } - - /** - * Sets the size for index blocks within each file; smaller blocks means a deeper index hierarchy - * within the file, while larger blocks mean a more shallow index hierarchy within the file. This - * can affect the performance of queries. - * - * @param job - * the Hadoop job instance to be configured - * @param indexBlockSize - * the block size, in bytes - * @since 1.5.0 - */ - public static void setIndexBlockSize(Job job, long indexBlockSize) { - FileOutputConfigurator.setIndexBlockSize(CLASS, job.getConfiguration(), indexBlockSize); - } - - /** - * Sets the file system replication factor for the resulting file, overriding the file system - * default. - * - * @param job - * the Hadoop job instance to be configured - * @param replication - * the number of replicas for produced files - * @since 1.5.0 - */ - public static void setReplication(Job job, int replication) { - FileOutputConfigurator.setReplication(CLASS, job.getConfiguration(), replication); - } - - /** - * Specify a sampler to be used when writing out data. This will result in the output file having - * sample data. - * - * @param job - * The Hadoop job instance to be configured - * @param samplerConfig - * The configuration for creating sample data in the output file. - * @since 1.8.0 - */ - - public static void setSampler(Job job, SamplerConfiguration samplerConfig) { - FileOutputConfigurator.setSampler(CLASS, job.getConfiguration(), samplerConfig); - } - - /** - * Specifies a list of summarizer configurations to create summary data in the output file. Each - * Key Value written will be passed to the configured {@link Summarizer}'s. - * - * @param job - * The Hadoop job instance to be configured - * @param summarizerConfigs - * summarizer configurations - * @since 2.0.0 - */ - public static void setSummarizers(Job job, SummarizerConfiguration... summarizerConfigs) { - FileOutputConfigurator.setSummarizers(CLASS, job.getConfiguration(), summarizerConfigs); - } - @Override public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException { // get the path of the temporary output file final Configuration conf = context.getConfiguration(); - final AccumuloConfiguration acuConf = FileOutputConfigurator.getAccumuloConfiguration(CLASS, - context.getConfiguration()); + final AccumuloConfiguration acuConf = FileOutputConfigurator + .getAccumuloConfiguration(AccumuloFileOutputFormat.class, context.getConfiguration()); final String extension = acuConf.get(Property.TABLE_FILE_TYPE); final Path file = this.getDefaultWorkFile(context, "." + extension); @@ -191,4 +91,26 @@ public class AccumuloFileOutputFormat extends FileOutputFormat { } }; } + + /** + * Sets all the information required for this map reduce job. + */ + public static void setInfo(Job job, FileOutputInfo info) { + setOutputPath(job, info.getOutputPath()); + if (info.getCompressionType().isPresent()) + setCompressionType(job, info.getCompressionType().get()); + if (info.getDataBlockSize().isPresent()) + setDataBlockSize(job, info.getDataBlockSize().get()); + if (info.getFileBlockSize().isPresent()) + setFileBlockSize(job, info.getFileBlockSize().get()); + if (info.getIndexBlockSize().isPresent()) + setIndexBlockSize(job, info.getIndexBlockSize().get()); + if (info.getReplication().isPresent()) + setReplication(job, info.getReplication().get()); + if (info.getSampler().isPresent()) + setSampler(job, info.getSampler().get()); + if (info.getSummarizers().size() > 0) + setSummarizers(job, info.getSummarizers().toArray(new SummarizerConfiguration[0])); + } + } diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormat.java index d4fe427..57dfaf5 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormat.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormat.java @@ -16,55 +16,74 @@ */ package org.apache.accumulo.hadoop.mapreduce; +import static org.apache.accumulo.hadoopImpl.mapreduce.AbstractInputFormat.setClassLoaderContext; +import static org.apache.accumulo.hadoopImpl.mapreduce.AbstractInputFormat.setClientInfo; +import static org.apache.accumulo.hadoopImpl.mapreduce.AbstractInputFormat.setScanAuthorizations; +import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setAutoAdjustRanges; +import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setBatchScan; +import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setExecutionHints; +import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setInputTableName; +import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setLocalIterators; +import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setOfflineTableScan; +import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setRanges; +import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setSamplerConfiguration; +import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setScanIsolation; + import java.io.IOException; +import java.util.List; import java.util.Map.Entry; -import org.apache.accumulo.core.client.ClientInfo; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.format.DefaultFormatter; +import org.apache.accumulo.hadoopImpl.mapreduce.AbstractInputFormat; +import org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase; +import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.log4j.Level; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class allows MapReduce jobs to use Accumulo as the source of data. This {@link InputFormat} * provides keys and values of type {@link Key} and {@link Value} to the Map function. * - * The user must specify the following via static configurator methods: + * The user must specify the following via static configurator method: * *

    - *
  • {@link AccumuloInputFormat#setClientInfo(Job, ClientInfo)} - *
  • {@link AccumuloInputFormat#setInputTableName(Job, String)} - *
  • {@link AccumuloInputFormat#setScanAuthorizations(Job, Authorizations)} + *
  • {@link AccumuloInputFormat#setInfo(Job, InputInfo)} *
* - * Other static methods are optional. + * For required parameters and all available options use {@link InputInfo#builder()} + * + * @since 2.0 */ -public class AccumuloInputFormat extends InputFormatBase { +public class AccumuloInputFormat extends InputFormat { + private static Class CLASS = AccumuloInputFormat.class; + private static Logger log = LoggerFactory.getLogger(CLASS); + /** + * Gets the splits of the tables that have been set on the job by reading the metadata table for + * the specified ranges. + * + * @return the splits from the tables based on the ranges. + * @throws java.io.IOException + * if a table set on the job doesn't exist or an error occurs initializing the tablet + * locator + */ @Override - public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) - throws IOException, InterruptedException { - log.setLevel(getLogLevel(context)); + public List getSplits(JobContext context) throws IOException { + return AbstractInputFormat.getSplits(context); + } - // Override the log level from the configuration as if the InputSplit has one it's the more - // correct one to use. - if (split instanceof RangeInputSplit) { - RangeInputSplit accSplit = (RangeInputSplit) split; - Level level = accSplit.getLogLevel(); - if (null != level) { - log.setLevel(level); - } - } else { - throw new IllegalArgumentException("No RecordReader for " + split.getClass()); - } + @Override + public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) { - return new RecordReaderBase() { + return new InputFormatBase.RecordReaderBase() { @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (scannerIterator.hasNext()) { @@ -80,4 +99,32 @@ public class AccumuloInputFormat extends InputFormatBase { } }; } + + /** + * Sets all the information required for this map reduce job. + */ + public static void setInfo(Job job, InputInfo info) { + setClientInfo(job, info.getClientInfo()); + setScanAuthorizations(job, info.getScanAuths()); + setInputTableName(job, info.getTableName()); + + // all optional values + if (info.getContext().isPresent()) + setClassLoaderContext(job, info.getContext().get()); + if (info.getRanges().size() > 0) + setRanges(job, info.getRanges()); + if (info.getIterators().size() > 0) + InputConfigurator.writeIteratorsToConf(CLASS, job.getConfiguration(), info.getIterators()); + if (info.getFetchColumns().size() > 0) + InputConfigurator.fetchColumns(CLASS, job.getConfiguration(), info.getFetchColumns()); + if (info.getSamplerConfig().isPresent()) + setSamplerConfiguration(job, info.getSamplerConfig().get()); + if (info.getExecutionHints().size() > 0) + setExecutionHints(job, info.getExecutionHints()); + setAutoAdjustRanges(job, info.isAutoAdjustRanges()); + setScanIsolation(job, info.isScanIsolation()); + setLocalIterators(job, info.isLocalIterators()); + setOfflineTableScan(job, info.isOfflineScan()); + setBatchScan(job, info.isBatchScan()); + } } diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloMultiTableInputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloMultiTableInputFormat.java deleted file mode 100644 index b460071..0000000 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloMultiTableInputFormat.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.hadoop.mapreduce; - -import static java.util.Objects.requireNonNull; - -import java.io.IOException; -import java.util.List; -import java.util.Map; - -import org.apache.accumulo.core.client.ClientInfo; -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.util.format.DefaultFormatter; -import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -/** - * This class allows MapReduce jobs to use multiple Accumulo tables as the source of data. This - * {@link org.apache.hadoop.mapreduce.InputFormat} provides keys and values of type {@link Key} and - * {@link Value} to the Map function. - * - * The user must specify the following via static configurator methods: - * - *
    - *
  • {@link AccumuloMultiTableInputFormat#setClientInfo(Job, ClientInfo)} - *
  • {@link AccumuloMultiTableInputFormat#setScanAuthorizations(Job, Authorizations)} - *
  • {@link AccumuloMultiTableInputFormat#setInputTableConfigs(Job, Map)} - *
- * - * Other static methods are optional. - */ -public class AccumuloMultiTableInputFormat extends AbstractInputFormat { - - /** - * Sets the {@link InputTableConfig} objects on the given Hadoop configuration - * - * @param job - * the Hadoop job instance to be configured - * @param configs - * the table query configs to be set on the configuration. - * @since 1.6.0 - */ - public static void setInputTableConfigs(Job job, Map configs) { - requireNonNull(configs); - InputConfigurator.setInputTableConfigs(CLASS, job.getConfiguration(), configs); - } - - @Override - public RecordReader createRecordReader(InputSplit inputSplit, - TaskAttemptContext context) throws IOException, InterruptedException { - log.setLevel(getLogLevel(context)); - return new AbstractRecordReader() { - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - if (scannerIterator.hasNext()) { - ++numKeysRead; - Map.Entry entry = scannerIterator.next(); - currentK = currentKey = entry.getKey(); - currentV = entry.getValue(); - if (log.isTraceEnabled()) - log.trace("Processing key/value pair: " + DefaultFormatter.formatEntry(entry, true)); - return true; - } - return false; - } - - @Override - protected List contextIterators(TaskAttemptContext context, - String tableName) { - return getInputTableConfig(context, tableName).getIterators(); - } - }; - } -} diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormat.java index 6236424..41818ee 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormat.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormat.java @@ -16,37 +16,23 @@ */ package org.apache.accumulo.hadoop.mapreduce; +import static org.apache.accumulo.hadoopImpl.mapreduce.AbstractInputFormat.getClientInfo; +import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloOutputFormatImpl.setBatchWriterOptions; +import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloOutputFormatImpl.setClientInfo; +import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloOutputFormatImpl.setCreateTables; +import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloOutputFormatImpl.setDefaultTableName; +import static org.apache.accumulo.hadoopImpl.mapreduce.AccumuloOutputFormatImpl.setSimulationMode; + import java.io.IOException; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map.Entry; -import java.util.Set; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.ClientInfo; -import org.apache.accumulo.core.client.MultiTableBatchWriter; -import org.apache.accumulo.core.client.MutationsRejectedException; -import org.apache.accumulo.core.client.TableExistsException; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.admin.DelegationTokenConfig; -import org.apache.accumulo.core.client.admin.SecurityOperations; -import org.apache.accumulo.core.client.impl.AuthenticationTokenIdentifier; -import org.apache.accumulo.core.client.impl.DelegationTokenImpl; -import org.apache.accumulo.core.client.security.SecurityErrorCode; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; -import org.apache.accumulo.core.client.security.tokens.DelegationToken; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.data.ColumnUpdate; import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.TabletId; -import org.apache.accumulo.core.security.ColumnVisibility; -import org.apache.accumulo.hadoopImpl.mapreduce.lib.ConfiguratorBase; -import org.apache.accumulo.hadoopImpl.mapreduce.lib.OutputConfigurator; +import org.apache.accumulo.hadoopImpl.mapreduce.AccumuloOutputFormatImpl; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; @@ -55,521 +41,31 @@ import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; -import org.apache.hadoop.security.token.Token; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; /** * This class allows MapReduce jobs to use Accumulo as the sink for data. This {@link OutputFormat} * accepts keys and values of type {@link Text} (for a table name) and {@link Mutation} from the Map * and Reduce functions. * - * The user must specify the following via static configurator methods: + * The user must specify the following via static configurator method: * *
    - *
  • {@link AccumuloOutputFormat#setClientInfo(Job, ClientInfo)} + *
  • {@link AccumuloOutputFormat#setInfo(Job, OutputInfo)} *
* - * Other static methods are optional. + * @since 2.0 */ public class AccumuloOutputFormat extends OutputFormat { - private static final Class CLASS = AccumuloOutputFormat.class; - protected static final Logger log = Logger.getLogger(CLASS); - - /** - * Set the connection information needed to communicate with Accumulo in this job. - * - * @param job - * Hadoop job to be configured - * @param info - * Accumulo connection information - * @since 2.0.0 - */ - public static void setClientInfo(Job job, ClientInfo info) { - OutputConfigurator.setClientInfo(CLASS, job.getConfiguration(), info); - } - - /** - * Get connection information from this job - * - * @param context - * Hadoop job context - * @return {@link ClientInfo} - * - * @since 2.0.0 - */ - protected static ClientInfo getClientInfo(JobContext context) { - return OutputConfigurator.getClientInfo(CLASS, context.getConfiguration()); - } - - /** - * Set Accumulo client properties file used to connect to Accumulo - * - * @param job - * Hadoop job to be configured - * @param clientPropsFile - * URL to Accumulo client properties file - * @since 2.0.0 - */ - public static void setClientPropertiesFile(Job job, String clientPropsFile) { - OutputConfigurator.setClientPropertiesFile(CLASS, job.getConfiguration(), clientPropsFile); - } - - /** - * Sets the connector information needed to communicate with Accumulo in this job. - * - *

- * WARNING: Some tokens, when serialized, divulge sensitive information in the - * configuration as a means to pass the token to MapReduce tasks. This information is BASE64 - * encoded to provide a charset safe conversion to a string, but this conversion is not intended - * to be secure. {@link PasswordToken} is one example that is insecure in this way; however - * {@link DelegationToken}s, acquired using - * {@link SecurityOperations#getDelegationToken(DelegationTokenConfig)}, is not subject to this - * concern. - * - * @param job - * the Hadoop job instance to be configured - * @param principal - * a valid Accumulo user name (user must have Table.CREATE permission if - * {@link #setCreateTables(Job, boolean)} is set to true) - * @param token - * the user's password - * @since 1.5.0 - * @deprecated since 2.0.0, replaced by {@link #setClientInfo(Job, ClientInfo)} - */ - @Deprecated - public static void setConnectorInfo(Job job, String principal, AuthenticationToken token) - throws AccumuloSecurityException { - // DelegationTokens can be passed securely from user to task without serializing insecurely in - // the configuration - if (token instanceof DelegationTokenImpl) { - DelegationTokenImpl delegationToken = (DelegationTokenImpl) token; - - // Convert it into a Hadoop Token - AuthenticationTokenIdentifier identifier = delegationToken.getIdentifier(); - Token hadoopToken = new Token<>(identifier.getBytes(), - delegationToken.getPassword(), identifier.getKind(), delegationToken.getServiceName()); - - // Add the Hadoop Token to the Job so it gets serialized and passed along. - job.getCredentials().addToken(hadoopToken.getService(), hadoopToken); - } - - OutputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), principal, token); - } - - /** - * Sets the connector information needed to communicate with Accumulo in this job. - * - *

- * Stores the password in a file in HDFS and pulls that into the Distributed Cache in an attempt - * to be more secure than storing it in the Configuration. - * - * @param job - * the Hadoop job instance to be configured - * @param principal - * a valid Accumulo user name (user must have Table.CREATE permission if - * {@link #setCreateTables(Job, boolean)} is set to true) - * @param tokenFile - * the path to the token file - * @since 1.6.0 - * @deprecated since 2.0.0, replaced by {@link #setClientPropertiesFile(Job, String)} - */ - @Deprecated - public static void setConnectorInfo(Job job, String principal, String tokenFile) - throws AccumuloSecurityException { - setClientPropertiesFile(job, tokenFile); - } - - /** - * Determines if the connector has been configured. - * - * @param context - * the Hadoop context for the configured job - * @return true if the connector has been configured, false otherwise - * @since 1.5.0 - * @see #setConnectorInfo(Job, String, AuthenticationToken) - */ - protected static Boolean isConnectorInfoSet(JobContext context) { - return OutputConfigurator.isConnectorInfoSet(CLASS, context.getConfiguration()); - } - - /** - * Gets the user name from the configuration. - * - * @param context - * the Hadoop context for the configured job - * @return the user name - * @since 1.5.0 - * @see #setConnectorInfo(Job, String, AuthenticationToken) - */ - protected static String getPrincipal(JobContext context) { - return OutputConfigurator.getPrincipal(CLASS, context.getConfiguration()); - } - - /** - * Gets the authenticated token from either the specified token file or directly from the - * configuration, whichever was used when the job was configured. - * - * @param context - * the Hadoop context for the configured job - * @return the principal's authentication token - * @since 1.6.0 - * @see #setConnectorInfo(Job, String, AuthenticationToken) - * @see #setConnectorInfo(Job, String, String) - */ - protected static AuthenticationToken getAuthenticationToken(JobContext context) { - AuthenticationToken token = OutputConfigurator.getAuthenticationToken(CLASS, - context.getConfiguration()); - return ConfiguratorBase.unwrapAuthenticationToken(context, token); - } - - /** - * Configures a {@link org.apache.accumulo.core.client.ZooKeeperInstance} for this job. - * - * @param job - * the Hadoop job instance to be configured - * - * @param clientConfig - * client configuration for specifying connection timeouts, SSL connection options, etc. - * @since 1.6.0 - * @deprecated since 2.0.0; Use {@link #setClientInfo(Job, ClientInfo)} instead. - */ - @Deprecated - public static void setZooKeeperInstance(Job job, - org.apache.accumulo.core.client.ClientConfiguration clientConfig) { - OutputConfigurator.setZooKeeperInstance(CLASS, job.getConfiguration(), clientConfig); - } - - /** - * Initializes an Accumulo {@link org.apache.accumulo.core.client.Instance} based on the - * configuration. - * - * @param context - * the Hadoop context for the configured job - * @return an Accumulo instance - * @since 1.5.0 - * @deprecated since 2.0.0; Use {@link #getClientInfo(JobContext)} instead. - */ - @Deprecated - protected static org.apache.accumulo.core.client.Instance getInstance(JobContext context) { - return OutputConfigurator.getInstance(CLASS, context.getConfiguration()); - } - - /** - * Sets the log level for this job. - * - * @param job - * the Hadoop job instance to be configured - * @param level - * the logging level - * @since 1.5.0 - */ - public static void setLogLevel(Job job, Level level) { - OutputConfigurator.setLogLevel(CLASS, job.getConfiguration(), level); - } - - /** - * Gets the log level from this configuration. - * - * @param context - * the Hadoop context for the configured job - * @return the log level - * @since 1.5.0 - * @see #setLogLevel(Job, Level) - */ - protected static Level getLogLevel(JobContext context) { - return OutputConfigurator.getLogLevel(CLASS, context.getConfiguration()); - } - - /** - * Sets the default table name to use if one emits a null in place of a table name for a given - * mutation. Table names can only be alpha-numeric and underscores. - * - * @param job - * the Hadoop job instance to be configured - * @param tableName - * the table to use when the tablename is null in the write call - * @since 1.5.0 - */ - public static void setDefaultTableName(Job job, String tableName) { - OutputConfigurator.setDefaultTableName(CLASS, job.getConfiguration(), tableName); - } - - /** - * Gets the default table name from the configuration. - * - * @param context - * the Hadoop context for the configured job - * @return the default table name - * @since 1.5.0 - * @see #setDefaultTableName(Job, String) - */ - protected static String getDefaultTableName(JobContext context) { - return OutputConfigurator.getDefaultTableName(CLASS, context.getConfiguration()); - } - - /** - * Sets the configuration for for the job's {@link BatchWriter} instances. If not set, a new - * {@link BatchWriterConfig}, with sensible built-in defaults is used. Setting the configuration - * multiple times overwrites any previous configuration. - * - * @param job - * the Hadoop job instance to be configured - * @param bwConfig - * the configuration for the {@link BatchWriter} - * @since 1.5.0 - */ - public static void setBatchWriterOptions(Job job, BatchWriterConfig bwConfig) { - OutputConfigurator.setBatchWriterOptions(CLASS, job.getConfiguration(), bwConfig); - } - - /** - * Gets the {@link BatchWriterConfig} settings. - * - * @param context - * the Hadoop context for the configured job - * @return the configuration object - * @since 1.5.0 - * @see #setBatchWriterOptions(Job, BatchWriterConfig) - */ - protected static BatchWriterConfig getBatchWriterOptions(JobContext context) { - return OutputConfigurator.getBatchWriterOptions(CLASS, context.getConfiguration()); - } - - /** - * Sets the directive to create new tables, as necessary. Table names can only be alpha-numeric - * and underscores. - * - *

- * By default, this feature is disabled. - * - * @param job - * the Hadoop job instance to be configured - * @param enableFeature - * the feature is enabled if true, disabled otherwise - * @since 1.5.0 - */ - public static void setCreateTables(Job job, boolean enableFeature) { - OutputConfigurator.setCreateTables(CLASS, job.getConfiguration(), enableFeature); - } - - /** - * Determines whether tables are permitted to be created as needed. - * - * @param context - * the Hadoop context for the configured job - * @return true if the feature is disabled, false otherwise - * @since 1.5.0 - * @see #setCreateTables(Job, boolean) - */ - protected static Boolean canCreateTables(JobContext context) { - return OutputConfigurator.canCreateTables(CLASS, context.getConfiguration()); - } - - /** - * Sets the directive to use simulation mode for this job. In simulation mode, no output is - * produced. This is useful for testing. - * - *

- * By default, this feature is disabled. - * - * @param job - * the Hadoop job instance to be configured - * @param enableFeature - * the feature is enabled if true, disabled otherwise - * @since 1.5.0 - */ - public static void setSimulationMode(Job job, boolean enableFeature) { - OutputConfigurator.setSimulationMode(CLASS, job.getConfiguration(), enableFeature); - } - - /** - * Determines whether this feature is enabled. - * - * @param context - * the Hadoop context for the configured job - * @return true if the feature is enabled, false otherwise - * @since 1.5.0 - * @see #setSimulationMode(Job, boolean) - */ - protected static Boolean getSimulationMode(JobContext context) { - return OutputConfigurator.getSimulationMode(CLASS, context.getConfiguration()); - } - - /** - * A base class to be used to create {@link RecordWriter} instances that write to Accumulo. - */ - protected static class AccumuloRecordWriter extends RecordWriter { - private MultiTableBatchWriter mtbw = null; - private HashMap bws = null; - private Text defaultTableName = null; - - private boolean simulate = false; - private boolean createTables = false; - - private long mutCount = 0; - private long valCount = 0; - - private AccumuloClient client; - - protected AccumuloRecordWriter(TaskAttemptContext context) - throws AccumuloException, AccumuloSecurityException, IOException { - Level l = getLogLevel(context); - if (l != null) - log.setLevel(getLogLevel(context)); - this.simulate = getSimulationMode(context); - this.createTables = canCreateTables(context); - - if (simulate) - log.info("Simulating output only. No writes to tables will occur"); - - this.bws = new HashMap<>(); - - String tname = getDefaultTableName(context); - this.defaultTableName = (tname == null) ? null : new Text(tname); - - if (!simulate) { - this.client = Accumulo.newClient().usingClientInfo(getClientInfo(context)).build(); - mtbw = client.createMultiTableBatchWriter(getBatchWriterOptions(context)); - } - } - - /** - * Push a mutation into a table. If table is null, the defaultTable will be used. If - * {@link AccumuloOutputFormat#canCreateTables(JobContext)} is set, the table will be created if - * it does not exist. The table name must only contain alphanumerics and underscore. - */ - @Override - public void write(Text table, Mutation mutation) throws IOException { - if (table == null || table.toString().isEmpty()) - table = this.defaultTableName; - - if (!simulate && table == null) - throw new IOException("No table or default table specified. Try simulation mode next time"); - - ++mutCount; - valCount += mutation.size(); - printMutation(table, mutation); - - if (simulate) - return; - - if (!bws.containsKey(table)) - try { - addTable(table); - } catch (Exception e) { - log.error("Could not add table '" + table + "'", e); - throw new IOException(e); - } - - try { - bws.get(table).addMutation(mutation); - } catch (MutationsRejectedException e) { - throw new IOException(e); - } - } - - public void addTable(Text tableName) throws AccumuloException, AccumuloSecurityException { - if (simulate) { - log.info("Simulating adding table: " + tableName); - return; - } - - log.debug("Adding table: " + tableName); - BatchWriter bw = null; - String table = tableName.toString(); - - if (createTables && !client.tableOperations().exists(table)) { - try { - client.tableOperations().create(table); - } catch (AccumuloSecurityException e) { - log.error("Accumulo security violation creating " + table, e); - throw e; - } catch (TableExistsException e) { - // Shouldn't happen - } - } - - try { - bw = mtbw.getBatchWriter(table); - } catch (TableNotFoundException e) { - log.error("Accumulo table " + table + " doesn't exist and cannot be created.", e); - throw new AccumuloException(e); - } catch (AccumuloException | AccumuloSecurityException e) { - throw e; - } - - if (bw != null) - bws.put(tableName, bw); - } - - private int printMutation(Text table, Mutation m) { - if (log.isTraceEnabled()) { - log.trace(String.format("Table %s row key: %s", table, hexDump(m.getRow()))); - for (ColumnUpdate cu : m.getUpdates()) { - log.trace(String.format("Table %s column: %s:%s", table, hexDump(cu.getColumnFamily()), - hexDump(cu.getColumnQualifier()))); - log.trace(String.format("Table %s security: %s", table, - new ColumnVisibility(cu.getColumnVisibility()).toString())); - log.trace(String.format("Table %s value: %s", table, hexDump(cu.getValue()))); - } - } - return m.getUpdates().size(); - } - - private String hexDump(byte[] ba) { - StringBuilder sb = new StringBuilder(); - for (byte b : ba) { - if ((b > 0x20) && (b < 0x7e)) - sb.append((char) b); - else - sb.append(String.format("x%02x", b)); - } - return sb.toString(); - } - - @Override - public void close(TaskAttemptContext attempt) throws IOException, InterruptedException { - log.debug("mutations written: " + mutCount + ", values written: " + valCount); - if (simulate) - return; - - try { - mtbw.close(); - } catch (MutationsRejectedException e) { - if (e.getSecurityErrorCodes().size() >= 0) { - HashMap> tables = new HashMap<>(); - for (Entry> ke : e.getSecurityErrorCodes().entrySet()) { - String tableId = ke.getKey().getTableId().toString(); - Set secCodes = tables.get(tableId); - if (secCodes == null) { - secCodes = new HashSet<>(); - tables.put(tableId, secCodes); - } - secCodes.addAll(ke.getValue()); - } - - log.error("Not authorized to write to tables : " + tables); - } - - if (e.getConstraintViolationSummaries().size() > 0) { - log.error("Constraint violations : " + e.getConstraintViolationSummaries().size()); - } - throw new IOException(e); - } - } - } - @Override public void checkOutputSpecs(JobContext job) throws IOException { - if (!isConnectorInfoSet(job)) - throw new IOException("Connector info has not been set."); try { // if the instance isn't configured, it will complain here - String principal = getPrincipal(job); - AuthenticationToken token = getAuthenticationToken(job); - AccumuloClient c = Accumulo.newClient().usingClientInfo(getClientInfo(job)).build(); + ClientInfo clientInfo = getClientInfo(job); + String principal = clientInfo.getPrincipal(); + AuthenticationToken token = clientInfo.getAuthenticationToken(); + AccumuloClient c = Accumulo.newClient().usingClientInfo(clientInfo).build(); + if (!c.securityOperations().authenticateUser(principal, token)) throw new IOException("Unable to authenticate user"); } catch (AccumuloException | AccumuloSecurityException e) { @@ -586,10 +82,20 @@ public class AccumuloOutputFormat extends OutputFormat { public RecordWriter getRecordWriter(TaskAttemptContext attempt) throws IOException { try { - return new AccumuloRecordWriter(attempt); + return new AccumuloOutputFormatImpl.AccumuloRecordWriter(attempt); } catch (Exception e) { throw new IOException(e); } } + public static void setInfo(Job job, OutputInfo info) { + setClientInfo(job, info.getClientInfo()); + if (info.getBatchWriterOptions().isPresent()) + setBatchWriterOptions(job, info.getBatchWriterOptions().get()); + if (info.getDefaultTableName().isPresent()) + setDefaultTableName(job, info.getDefaultTableName().get()); + setCreateTables(job, info.isCreateTables()); + setSimulationMode(job, info.isSimulationMode()); + } + } diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloRowInputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloRowInputFormat.java index d330604..7dfdaa1 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloRowInputFormat.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloRowInputFormat.java @@ -16,19 +16,35 @@ */ package org.apache.accumulo.hadoop.mapreduce; +import static org.apache.accumulo.hadoopImpl.mapreduce.AbstractInputFormat.setClassLoaderContext; +import static org.apache.accumulo.hadoopImpl.mapreduce.AbstractInputFormat.setClientInfo; +import static org.apache.accumulo.hadoopImpl.mapreduce.AbstractInputFormat.setScanAuthorizations; +import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setAutoAdjustRanges; +import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setBatchScan; +import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setExecutionHints; +import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setInputTableName; +import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setLocalIterators; +import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setOfflineTableScan; +import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setRanges; +import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setSamplerConfiguration; +import static org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase.setScanIsolation; + import java.io.IOException; +import java.util.List; import java.util.Map.Entry; -import org.apache.accumulo.core.client.ClientInfo; import org.apache.accumulo.core.client.RowIterator; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.PeekingIterator; +import org.apache.accumulo.hadoopImpl.mapreduce.AbstractInputFormat; +import org.apache.accumulo.hadoopImpl.mapreduce.InputFormatBase; +import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -38,23 +54,23 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; * value, which in turn makes the {@link Key}/{@link Value} pairs for that row available to the Map * function. * - * The user must specify the following via static configurator methods: + * The user must specify the following via static configurator method: * *

    - *
  • {@link AccumuloRowInputFormat#setClientInfo(Job, ClientInfo)} - *
  • {@link AccumuloRowInputFormat#setInputTableName(Job, String)} - *
  • {@link AccumuloRowInputFormat#setScanAuthorizations(Job, Authorizations)} + *
  • {@link AccumuloRowInputFormat#setInfo(Job, InputInfo)} *
* - * Other static methods are optional. + * For required parameters and all available options use {@link InputInfo#builder()} + * + * @since 2.0 */ -public class AccumuloRowInputFormat - extends InputFormatBase>> { +public class AccumuloRowInputFormat extends InputFormat>> { + private static Class CLASS = AccumuloRowInputFormat.class; + @Override public RecordReader>> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { - log.setLevel(getLogLevel(context)); - return new RecordReaderBase>>() { + return new InputFormatBase.RecordReaderBase>>() { RowIterator rowIterator; @Override @@ -77,4 +93,46 @@ public class AccumuloRowInputFormat } }; } + + /** + * Gets the splits of the tables that have been set on the job by reading the metadata table for + * the specified ranges. + * + * @return the splits from the tables based on the ranges. + * @throws java.io.IOException + * if a table set on the job doesn't exist or an error occurs initializing the tablet + * locator + */ + @Override + public List getSplits(JobContext context) throws IOException { + return AbstractInputFormat.getSplits(context); + } + + /** + * Sets all the information required for this map reduce job. + */ + public static void setInfo(Job job, InputInfo info) { + setClientInfo(job, info.getClientInfo()); + setScanAuthorizations(job, info.getScanAuths()); + setInputTableName(job, info.getTableName()); + + // all optional values + if (info.getContext().isPresent()) + setClassLoaderContext(job, info.getContext().get()); + if (info.getRanges().size() > 0) + setRanges(job, info.getRanges()); + if (info.getIterators().size() > 0) + InputConfigurator.writeIteratorsToConf(CLASS, job.getConfiguration(), info.getIterators()); + if (info.getFetchColumns().size() > 0) + InputConfigurator.fetchColumns(CLASS, job.getConfiguration(), info.getFetchColumns()); + if (info.getSamplerConfig().isPresent()) + setSamplerConfiguration(job, info.getSamplerConfig().get()); + if (info.getExecutionHints().size() > 0) + setExecutionHints(job, info.getExecutionHints()); + setAutoAdjustRanges(job, info.isAutoAdjustRanges()); + setScanIsolation(job, info.isScanIsolation()); + setLocalIterators(job, info.isLocalIterators()); + setOfflineTableScan(job, info.isOfflineScan()); + setBatchScan(job, info.isBatchScan()); + } } diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/FileOutputInfo.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/FileOutputInfo.java new file mode 100644 index 0000000..70b6043 --- /dev/null +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/FileOutputInfo.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.hadoop.mapreduce; + +import java.util.Collection; +import java.util.Optional; + +import org.apache.accumulo.core.client.sample.SamplerConfiguration; +import org.apache.accumulo.core.client.summary.SummarizerConfiguration; +import org.apache.accumulo.hadoopImpl.mapreduce.FileOutputInfoImpl; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; + +/** + * Object containing all the information needed for the Map Reduce job. This object is passed to + * {@link AccumuloFileOutputFormat#setInfo(Job, FileOutputInfo)}. It uses a fluent API like so: + * + *
+ * FileOutputInfo.builder()
+ *      .outputPath(path)
+ *      .fileBlockSize(b)
+ *      .compressionType(type)
+ *      .summarizers(sc1, sc2).build());
+ * 
+ * + * @since 2.0 + */ +public interface FileOutputInfo { + + /** + * @return the output path set using FileOutputInfo.builder()...outputPath(path) + */ + public Path getOutputPath(); + + /** + * @return the compression if set using FileOutputInfo.builder()...compressionType(type) + */ + public Optional getCompressionType(); + + /** + * @return the data block size if set using FileOutputInfo.builder()...dataBlockSize(size) + */ + public Optional getDataBlockSize(); + + /** + * @return the file block size if set using FileOutputInfo.builder()...fileBlockSize(size) + */ + public Optional getFileBlockSize(); + + /** + * @return the index block size if set using FileOutputInfo.builder()...indexBlockSize(size) + */ + public Optional getIndexBlockSize(); + + /** + * @return the replication if set using FileOutputInfo.builder()...replication(num) + */ + public Optional getReplication(); + + /** + * @return the SamplerConfiguration if set using FileOutputInfo.builder()...sampler(conf) + */ + public Optional getSampler(); + + /** + * @return the summarizers set using FileOutputInfo.builder()...summarizers(conf1, conf2...) + */ + public Collection getSummarizers(); + + /** + * @return builder for creating a {@link FileOutputInfo} + */ + public static FileOutputInfoBuilder.PathParams builder() { + return new FileOutputInfoImpl.FileOutputInfoBuilderImpl(); + } + + /** + * Fluent API builder for FileOutputInfo + * + * @since 2.0 + */ + interface FileOutputInfoBuilder { + + /** + * Required params for builder + * + * @since 2.0 + */ + interface PathParams { + /** + * Set the Path of the output directory for the map-reduce job. + */ + OutputOptions outputPath(Path path); + } + + /** + * Options for builder + * + * @since 2.0 + */ + interface OutputOptions { + /** + * Sets the compression type to use for data blocks, overriding the default. Specifying a + * compression may require additional libraries to be available to your Job. + * + * @param compressionType + * one of "none", "gz", "lzo", or "snappy" + */ + OutputOptions compressionType(String compressionType); + + /** + * Sets the size for data blocks within each file.
+ * Data blocks are a span of key/value pairs stored in the file that are compressed and + * indexed as a group. + * + *

+ * Making this value smaller may increase seek performance, but at the cost of increasing the + * size of the indexes (which can also affect seek performance). + * + * @param dataBlockSize + * the block size, in bytes + */ + OutputOptions dataBlockSize(long dataBlockSize); + + /** + * Sets the size for file blocks in the file system; file blocks are managed, and replicated, + * by the underlying file system. + * + * @param fileBlockSize + * the block size, in bytes + */ + OutputOptions fileBlockSize(long fileBlockSize); + + /** + * Sets the size for index blocks within each file; smaller blocks means a deeper index + * hierarchy within the file, while larger blocks mean a more shallow index hierarchy within + * the file. This can affect the performance of queries. + * + * @param indexBlockSize + * the block size, in bytes + */ + OutputOptions indexBlockSize(long indexBlockSize); + + /** + * Sets the file system replication factor for the resulting file, overriding the file system + * default. + * + * @param replication + * the number of replicas for produced files + */ + OutputOptions replication(int replication); + + /** + * Specify a sampler to be used when writing out data. This will result in the output file + * having sample data. + * + * @param samplerConfig + * The configuration for creating sample data in the output file. + */ + OutputOptions sampler(SamplerConfiguration samplerConfig); + + /** + * Specifies a list of summarizer configurations to create summary data in the output file. + * Each Key Value written will be passed to the configured + * {@link org.apache.accumulo.core.client.summary.Summarizer}'s. + * + * @param summarizerConfigs + * summarizer configurations + */ + OutputOptions summarizers(SummarizerConfiguration... summarizerConfigs); + + /** + * @return newly created {@link FileOutputInfo} + */ + FileOutputInfo build(); + } + } +} diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/InputInfo.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/InputInfo.java new file mode 100644 index 0000000..a5bdc89 --- /dev/null +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/InputInfo.java @@ -0,0 +1,363 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.hadoop.mapreduce; + +import java.util.Collection; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; + +import org.apache.accumulo.core.client.ClientInfo; +import org.apache.accumulo.core.client.ClientSideIteratorScanner; +import org.apache.accumulo.core.client.IsolatedScanner; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.ScannerBase; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.client.sample.SamplerConfiguration; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.hadoopImpl.mapreduce.InputInfoImpl; +import org.apache.hadoop.mapreduce.Job; + +/** + * Object containing all the information needed for the Map Reduce job. This object is passed to the + * different Input Format types see {@link AccumuloInputFormat#setInfo(Job, InputInfo)}. It uses a + * fluent API: + * + *

+ * InputInfo.builder().clientInfo(info).table(name).scanAuths(auths).build();
+ *
+ * InputInfo.builder().clientProperties(props).table(name).scanAuths(auths).addIterator(cfg)
+ *     .executionHints(hints).build();
+ * 
+ * + * @since 2.0 + */ +public interface InputInfo { + /** + * @return the table name set using InputInfo.builder()...table(name) + */ + String getTableName(); + + /** + * @return the client info set using InputInfo.builder().clientInfo(info) + */ + ClientInfo getClientInfo(); + + /** + * @return the scan authorizations set using InputInfo.builder()...scanAuths(auths) + */ + Authorizations getScanAuths(); + + /** + * @return the context set using InputInfo.builder()...classLoaderContext(context) + */ + Optional getContext(); + + /** + * @return the Ranges set using InputInfo.builder()...ranges(ranges) + */ + Collection getRanges(); + + /** + * @return the ColumnFamily,ColumnQualifier Pairs set using + * InputInfo.builder()...fetchColumns(cfcqPairs) + */ + Collection getFetchColumns(); + + /** + * @return the collection of IteratorSettings set using InputInfo.builder()...addIterator(cfg) + */ + Collection getIterators(); + + /** + * @return the SamplerConfiguration set using InputInfo.builder()...samplerConfiguration(cfg) + */ + Optional getSamplerConfig(); + + /** + * @return the Execution Hints set using InputInfo.builder()...executionHints(hints) + */ + Map getExecutionHints(); + + /** + * @return boolean if auto adjusting ranges or not + */ + boolean isAutoAdjustRanges(); + + /** + * @return boolean if using scan isolation or not + */ + boolean isScanIsolation(); + + /** + * @return boolean if using local iterators or not + */ + boolean isLocalIterators(); + + /** + * @return boolean if using offline scan or not + */ + boolean isOfflineScan(); + + /** + * @return boolean if using batch scanner or not + */ + boolean isBatchScan(); + + /** + * Builder starting point for map reduce input format information. + */ + static InputInfoBuilder.ClientParams builder() { + return new InputInfoImpl.InputInfoBuilderImpl(); + } + + /** + * Required build values to be set. + * + * @since 2.0 + */ + interface InputInfoBuilder { + /** + * Required params for builder + * + * @since 2.0 + */ + interface ClientParams { + /** + * Set the connection information needed to communicate with Accumulo in this job. ClientInfo + * param can be created using {@link ClientInfo#from(String)} or + * {@link ClientInfo#from(Properties)} + * + * @param clientInfo + * Accumulo connection information + */ + TableParams clientInfo(ClientInfo clientInfo); + } + + /** + * Required params for builder + * + * @since 2.0 + */ + interface TableParams { + /** + * Sets the name of the input table, over which this job will scan. + * + * @param tableName + * the table to use when the tablename is null in the write call + */ + AuthsParams table(String tableName); + } + + /** + * Required params for builder + * + * @since 2.0 + */ + interface AuthsParams { + /** + * Sets the {@link Authorizations} used to scan. Must be a subset of the user's + * authorizations. If none present use {@link Authorizations#EMPTY} + * + * @param auths + * the user's authorizations + */ + InputFormatOptions scanAuths(Authorizations auths); + } + + /** + * Options for batch scan + * + * @since 2.0 + */ + interface BatchScanOptions { + /** + * @return newly created {@link InputInfo} + */ + InputInfo build(); + } + + /** + * Options for scan + * + * @since 2.0 + */ + interface ScanOptions extends BatchScanOptions { + /** + * @see InputFormatOptions#scanIsolation() + */ + ScanOptions scanIsolation(); + + /** + * @see InputFormatOptions#localIterators() + */ + ScanOptions localIterators(); + + /** + * @see InputFormatOptions#offlineScan() + */ + ScanOptions offlineScan(); + } + + /** + * Optional values to set using fluent API + * + * @since 2.0 + */ + interface InputFormatOptions { + /** + * Sets the name of the classloader context on this scanner + * + * @param context + * name of the classloader context + */ + InputFormatOptions classLoaderContext(String context); + + /** + * Sets the input ranges to scan for the single input table associated with this job. + * + * @param ranges + * the ranges that will be mapped over + * @see TableOperations#splitRangeByTablets(String, Range, int) + */ + InputFormatOptions ranges(Collection ranges); + + /** + * Restricts the columns that will be mapped over for this job for the default input table. + * + * @param fetchColumns + * a collection of IteratorSetting.Column objects corresponding to column family and + * column qualifier. If the column qualifier is null, the entire column family is + * selected. An empty set is the default and is equivalent to scanning all columns. + */ + InputFormatOptions fetchColumns(Collection fetchColumns); + + /** + * Encode an iterator on the single input table for this job. It is safe to call this method + * multiple times. If an iterator is added with the same name, it will be overridden. + * + * @param cfg + * the configuration of the iterator + */ + InputFormatOptions addIterator(IteratorSetting cfg); + + /** + * Set these execution hints on scanners created for input splits. See + * {@link ScannerBase#setExecutionHints(java.util.Map)} + */ + InputFormatOptions executionHints(Map hints); + + /** + * Causes input format to read sample data. If sample data was created using a different + * configuration or a tables sampler configuration changes while reading data, then the input + * format will throw an error. + * + * @param samplerConfig + * The sampler configuration that sample must have been created with inorder for + * reading sample data to succeed. + * + * @see ScannerBase#setSamplerConfiguration(SamplerConfiguration) + */ + InputFormatOptions samplerConfiguration(SamplerConfiguration samplerConfig); + + /** + * Disables the automatic adjustment of ranges for this job. This feature merges overlapping + * ranges, then splits them to align with tablet boundaries. Disabling this feature will cause + * exactly one Map task to be created for each specified range. Disabling has no effect for + * batch scans at it will always automatically adjust ranges. + *

+ * By default, this feature is enabled. + * + * @see #ranges(Collection) + */ + InputFormatOptions disableAutoAdjustRanges(); + + /** + * Enables the use of the {@link IsolatedScanner} in this job. + *

+ * By default, this feature is disabled. + */ + ScanOptions scanIsolation(); + + /** + * Enables the use of the {@link ClientSideIteratorScanner} in this job. This feature will + * cause the iterator stack to be constructed within the Map task, rather than within the + * Accumulo TServer. To use this feature, all classes needed for those iterators must be + * available on the classpath for the task. + *

+ * By default, this feature is disabled. + */ + ScanOptions localIterators(); + + /** + * Enable reading offline tables. By default, this feature is disabled and only online tables + * are scanned. This will make the map reduce job directly read the table's files. If the + * table is not offline, then the job will fail. If the table comes online during the map + * reduce job, it is likely that the job will fail. + *

+ * To use this option, the map reduce user will need access to read the Accumulo directory in + * HDFS. + *

+ * Reading the offline table will create the scan time iterator stack in the map process. So + * any iterators that are configured for the table will need to be on the mapper's classpath. + *

+ * One way to use this feature is to clone a table, take the clone offline, and use the clone + * as the input table for a map reduce job. If you plan to map reduce over the data many + * times, it may be better to the compact the table, clone it, take it offline, and use the + * clone for all map reduce jobs. The reason to do this is that compaction will reduce each + * tablet in the table to one file, and it is faster to read from one file. + *

+ * There are two possible advantages to reading a tables file directly out of HDFS. First, you + * may see better read performance. Second, it will support speculative execution better. When + * reading an online table speculative execution can put more load on an already slow tablet + * server. + *

+ * By default, this feature is disabled. + */ + ScanOptions offlineScan(); + + /** + * Enables the use of the {@link org.apache.accumulo.core.client.BatchScanner} in this job. + * Using this feature will group Ranges by their source tablet, producing an InputSplit per + * tablet rather than per Range. This batching helps to reduce overhead when querying a large + * number of small ranges. (ex: when doing quad-tree decomposition for spatial queries) + *

+ * In order to achieve good locality of InputSplits this option always clips the input Ranges + * to tablet boundaries. This may result in one input Range contributing to several + * InputSplits. + *

+ * Note: calls to {@link #disableAutoAdjustRanges()} is ignored when BatchScan is enabled. + *

+ * This configuration is incompatible with: + *

    + *
  • {@link #offlineScan()}
  • + *
  • {@link #localIterators()}
  • + *
  • {@link #scanIsolation()}
  • + *
+ *

+ * By default, this feature is disabled. + */ + BatchScanOptions batchScan(); + + /** + * @return newly created {@link InputInfo} + */ + InputInfo build(); + } + } +} diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/OutputInfo.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/OutputInfo.java new file mode 100644 index 0000000..20f9667 --- /dev/null +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/OutputInfo.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.hadoop.mapreduce; + +import java.util.Optional; +import java.util.Properties; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.ClientInfo; +import org.apache.accumulo.hadoopImpl.mapreduce.OutputInfoImpl; +import org.apache.hadoop.mapreduce.Job; + +/** + * Object containing all the information needed for the Map Reduce job. This object is passed to + * {@link AccumuloOutputFormat#setInfo(Job, OutputInfo)}. It uses a fluent API like so: + * + *

+ * OutputInfo.builder().clientInfo(clientInfo).batchWriterOptions(bwConfig).build();
+ * 
+ * + * @since 2.0 + */ +public interface OutputInfo { + /** + * @return the client info set using OutputInfo.builder().clientInfo(info) + */ + ClientInfo getClientInfo(); + + /** + * @return the BatchWriterConfig set using OutputInfo.builder()...batchWriterOptions(conf) + */ + Optional getBatchWriterOptions(); + + /** + * @return the default tame name set using OutputInfo.builder()...defaultTableName(name) + */ + Optional getDefaultTableName(); + + /** + * @return boolean if creating tables or not + */ + boolean isCreateTables(); + + /** + * @return boolean if running simulation mode or not + */ + boolean isSimulationMode(); + + /** + * @return builder for creating a {@link OutputInfo} + */ + public static OutputInfoBuilder.ClientParams builder() { + return new OutputInfoImpl.OutputInfoBuilderImpl(); + } + + /** + * Fluent API builder for OutputInfo + * + * @since 2.0 + */ + interface OutputInfoBuilder { + + /** + * Required params for client + * + * @since 2.0 + */ + interface ClientParams { + /** + * Set the connection information needed to communicate with Accumulo in this job. ClientInfo + * param can be created using {@link ClientInfo#from(String)} or + * {@link ClientInfo#from(Properties)} + * + * @param clientInfo + * Accumulo connection information + */ + OutputOptions clientInfo(ClientInfo clientInfo); + } + + /** + * Builder options + * + * @since 2.0 + */ + interface OutputOptions { + /** + * Sets the configuration for for the job's {@link BatchWriter} instances. If not set, a new + * {@link BatchWriterConfig}, with sensible built-in defaults is used. Setting the + * configuration multiple times overwrites any previous configuration. + * + * @param bwConfig + * the configuration for the {@link BatchWriter} + */ + OutputOptions batchWriterOptions(BatchWriterConfig bwConfig); + + /** + * Sets the default table name to use if one emits a null in place of a table name for a given + * mutation. Table names can only be alpha-numeric and underscores. + * + * @param tableName + * the table to use when the tablename is null in the write call + */ + OutputOptions defaultTableName(String tableName); + + /** + * Enables the directive to create new tables, as necessary. Table names can only be + * alpha-numeric and underscores. + *

+ * By default, this feature is disabled. + */ + OutputOptions enableCreateTables(); + + /** + * Enables the directive to use simulation mode for this job. In simulation mode, no output is + * produced. This is useful for testing. + *

+ * By default, this feature is disabled. + */ + OutputOptions enableSimulationMode(); + + /** + * @return newly created {@link OutputInfo} + */ + OutputInfo build(); + } + } +} diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AbstractInputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AbstractInputFormat.java similarity index 69% rename from hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AbstractInputFormat.java rename to hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AbstractInputFormat.java index 0587096..c5dc14a 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AbstractInputFormat.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AbstractInputFormat.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.accumulo.hadoop.mapred; +package org.apache.accumulo.hadoopImpl.mapred; import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly; @@ -31,7 +31,6 @@ import java.util.Map; import java.util.Random; import java.util.concurrent.TimeUnit; -import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; @@ -45,21 +44,13 @@ import org.apache.accumulo.core.client.ScannerBase; import org.apache.accumulo.core.client.TableDeletedException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.TableOfflineException; -import org.apache.accumulo.core.client.admin.DelegationTokenConfig; -import org.apache.accumulo.core.client.admin.SecurityOperations; -import org.apache.accumulo.core.client.impl.AuthenticationTokenIdentifier; import org.apache.accumulo.core.client.impl.ClientContext; -import org.apache.accumulo.core.client.impl.DelegationTokenImpl; import org.apache.accumulo.core.client.impl.OfflineScanner; import org.apache.accumulo.core.client.impl.ScannerImpl; import org.apache.accumulo.core.client.impl.Table; import org.apache.accumulo.core.client.impl.Tables; import org.apache.accumulo.core.client.impl.TabletLocator; import org.apache.accumulo.core.client.sample.SamplerConfiguration; -import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; -import org.apache.accumulo.core.client.security.tokens.DelegationToken; -import org.apache.accumulo.core.client.security.tokens.KerberosToken; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; @@ -67,28 +58,26 @@ import org.apache.accumulo.core.data.impl.KeyExtent; import org.apache.accumulo.core.master.state.tables.TableState; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.Pair; -import org.apache.accumulo.hadoop.mapreduce.InputTableConfig; -import org.apache.accumulo.hadoopImpl.mapred.BatchInputSplit; +import org.apache.accumulo.hadoop.mapred.AccumuloInputFormat; +import org.apache.accumulo.hadoopImpl.mapreduce.InputTableConfig; import org.apache.accumulo.hadoopImpl.mapreduce.SplitUtils; -import org.apache.accumulo.hadoopImpl.mapreduce.lib.ConfiguratorBase; import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.security.token.Token; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * An abstract input format to provide shared methods common to all other input format classes. At * the very least, any classes inheriting from this class will need to define their own * {@link RecordReader}. */ -public abstract class AbstractInputFormat implements InputFormat { +public abstract class AbstractInputFormat { protected static final Class CLASS = AccumuloInputFormat.class; - protected static final Logger log = Logger.getLogger(CLASS); + private static final Logger log = LoggerFactory.getLogger(CLASS); /** * Sets the name of the classloader context on this scanner @@ -111,7 +100,7 @@ public abstract class AbstractInputFormat implements InputFormat { * @return name of the current context * @since 1.8.0 */ - public static String getClassLoaderContext(JobConf job) { + protected static String getClassLoaderContext(JobConf job) { return InputConfigurator.getClassLoaderContext(CLASS, job); } @@ -138,7 +127,7 @@ public abstract class AbstractInputFormat implements InputFormat { * URL to Accumulo client properties file * @since 2.0.0 */ - public static void setClientPropertiesFile(JobConf job, String clientPropsFile) { + protected static void setClientPropertiesFile(JobConf job, String clientPropsFile) { InputConfigurator.setClientPropertiesFile(CLASS, job, clientPropsFile); } @@ -155,179 +144,6 @@ public abstract class AbstractInputFormat implements InputFormat { } /** - * Sets the connector information needed to communicate with Accumulo in this job. - * - *

- * WARNING: Some tokens, when serialized, divulge sensitive information in the - * configuration as a means to pass the token to MapReduce tasks. This information is BASE64 - * encoded to provide a charset safe conversion to a string, but this conversion is not intended - * to be secure. {@link PasswordToken} is one example that is insecure in this way; however - * {@link DelegationToken}s, acquired using - * {@link SecurityOperations#getDelegationToken(DelegationTokenConfig)}, is not subject to this - * concern. - * - * @param job - * the Hadoop job instance to be configured - * @param principal - * a valid Accumulo user name (user must have Table.CREATE permission) - * @param token - * the user's password - * @since 1.5.0 - * @deprecated since 2.0.0, use {@link #setClientInfo(JobConf, ClientInfo)} instead - */ - @Deprecated - public static void setConnectorInfo(JobConf job, String principal, AuthenticationToken token) - throws AccumuloSecurityException { - if (token instanceof KerberosToken) { - log.info("Received KerberosToken, attempting to fetch DelegationToken"); - try { - AccumuloClient client = Accumulo.newClient().usingClientInfo(getClientInfo(job)) - .usingToken(principal, token).build(); - token = client.securityOperations().getDelegationToken(new DelegationTokenConfig()); - } catch (Exception e) { - log.warn("Failed to automatically obtain DelegationToken, Mappers/Reducers will likely" - + " fail to communicate with Accumulo", e); - } - } - // DelegationTokens can be passed securely from user to task without serializing insecurely in - // the configuration - if (token instanceof DelegationTokenImpl) { - DelegationTokenImpl delegationToken = (DelegationTokenImpl) token; - - // Convert it into a Hadoop Token - AuthenticationTokenIdentifier identifier = delegationToken.getIdentifier(); - Token hadoopToken = new Token<>(identifier.getBytes(), - delegationToken.getPassword(), identifier.getKind(), delegationToken.getServiceName()); - - // Add the Hadoop Token to the Job so it gets serialized and passed along. - job.getCredentials().addToken(hadoopToken.getService(), hadoopToken); - } - - InputConfigurator.setConnectorInfo(CLASS, job, principal, token); - } - - /** - * Sets the connector information needed to communicate with Accumulo in this job. - * - *

- * Stores the password in a file in HDFS and pulls that into the Distributed Cache in an attempt - * to be more secure than storing it in the Configuration. - * - * @param job - * the Hadoop job instance to be configured - * @param principal - * a valid Accumulo user name (user must have Table.CREATE permission) - * @param tokenFile - * the path to the token file - * @since 1.6.0 - * @deprecated since 2.0.0, use {@link #setClientPropertiesFile(JobConf, String)} instead - */ - @Deprecated - public static void setConnectorInfo(JobConf job, String principal, String tokenFile) - throws AccumuloSecurityException { - setClientPropertiesFile(job, tokenFile); - } - - /** - * Determines if the connector has been configured. - * - * @param job - * the Hadoop context for the configured job - * @return true if the connector has been configured, false otherwise - * @since 1.5.0 - * @see #setConnectorInfo(JobConf, String, AuthenticationToken) - */ - protected static Boolean isConnectorInfoSet(JobConf job) { - return InputConfigurator.isConnectorInfoSet(CLASS, job); - } - - /** - * Gets the user name from the configuration. - * - * @param job - * the Hadoop context for the configured job - * @return the user name - * @since 1.5.0 - * @see #setConnectorInfo(JobConf, String, AuthenticationToken) - */ - protected static String getPrincipal(JobConf job) { - return InputConfigurator.getPrincipal(CLASS, job); - } - - /** - * Gets the authenticated token from either the specified token file or directly from the - * configuration, whichever was used when the job was configured. - * - * @param job - * the Hadoop context for the configured job - * @return the principal's authentication token - * @since 1.6.0 - * @see #setConnectorInfo(JobConf, String, AuthenticationToken) - * @see #setConnectorInfo(JobConf, String, String) - */ - protected static AuthenticationToken getAuthenticationToken(JobConf job) { - AuthenticationToken token = InputConfigurator.getAuthenticationToken(CLASS, job); - return ConfiguratorBase.unwrapAuthenticationToken(job, token); - } - - /** - * Configures a {@link org.apache.accumulo.core.client.ZooKeeperInstance} for this job. - * - * @param job - * the Hadoop job instance to be configured - * @param clientConfig - * client configuration containing connection options - * @since 1.6.0 - * @deprecated since 2.0.0; Use {@link #setClientInfo(JobConf, ClientInfo)} instead. - */ - @Deprecated - public static void setZooKeeperInstance(JobConf job, - org.apache.accumulo.core.client.ClientConfiguration clientConfig) { - InputConfigurator.setZooKeeperInstance(CLASS, job, clientConfig); - } - - /** - * Initializes an Accumulo {@link org.apache.accumulo.core.client.Instance} based on the - * configuration. - * - * @param job - * the Hadoop context for the configured job - * @return an Accumulo instance - * @since 1.5.0 - * @deprecated since 2.0.0, Use {@link #getClientInfo(JobConf)} instead - */ - @Deprecated - protected static org.apache.accumulo.core.client.Instance getInstance(JobConf job) { - return InputConfigurator.getInstance(CLASS, job); - } - - /** - * Sets the log level for this job. - * - * @param job - * the Hadoop job instance to be configured - * @param level - * the logging level - * @since 1.5.0 - */ - public static void setLogLevel(JobConf job, Level level) { - InputConfigurator.setLogLevel(CLASS, job, level); - } - - /** - * Gets the log level from this configuration. - * - * @param job - * the Hadoop context for the configured job - * @return the log level - * @since 1.5.0 - * @see #setLogLevel(JobConf, Level) - */ - protected static Level getLogLevel(JobConf job) { - return InputConfigurator.getLogLevel(CLASS, job); - } - - /** * Sets the {@link org.apache.accumulo.core.security.Authorizations} used to scan. Must be a * subset of the user's authorization. Defaults to the empty set. * @@ -354,21 +170,6 @@ public abstract class AbstractInputFormat implements InputFormat { return InputConfigurator.getScanAuthorizations(CLASS, job); } - /** - * Fetch the client configuration from the job. - * - * @param job - * The job - * @return The client configuration for the job - * @since 1.7.0 - * @deprecated since 2.0.0, replaced by {@link #getClientInfo(JobConf)} - */ - @Deprecated - protected static org.apache.accumulo.core.client.ClientConfiguration getClientConfiguration( - JobConf job) { - return InputConfigurator.getClientConfiguration(CLASS, job); - } - // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job) /** * Check whether a configuration is fully configured to be used with an Accumulo @@ -393,7 +194,7 @@ public abstract class AbstractInputFormat implements InputFormat { * @return the {@link InputTableConfig} objects set on the job * @since 1.6.0 */ - public static Map getInputTableConfigs(JobConf job) { + protected static Map getInputTableConfigs(JobConf job) { return InputConfigurator.getInputTableConfigs(CLASS, job); } @@ -410,7 +211,7 @@ public abstract class AbstractInputFormat implements InputFormat { * @return the {@link InputTableConfig} for the given table * @since 1.6.0 */ - public static InputTableConfig getInputTableConfig(JobConf job, String tableName) { + protected static InputTableConfig getInputTableConfig(JobConf job, String tableName) { return InputConfigurator.getInputTableConfig(CLASS, job, tableName); } @@ -427,11 +228,11 @@ public abstract class AbstractInputFormat implements InputFormat { *

  • int {@link #numKeysRead} (used for progress reporting)
  • * */ - protected abstract static class AbstractRecordReader implements RecordReader { + public abstract static class AbstractRecordReader implements RecordReader { protected long numKeysRead; protected Iterator> scannerIterator; protected RangeInputSplit split; - private org.apache.accumulo.hadoop.mapreduce.RangeInputSplit baseSplit; + private org.apache.accumulo.hadoopImpl.mapreduce.RangeInputSplit baseSplit; protected ScannerBase scannerBase; /** @@ -458,7 +259,7 @@ public abstract class AbstractInputFormat implements InputFormat { * @since 1.7.0 */ private void setupIterators(JobConf job, ScannerBase scanner, String tableName, - org.apache.accumulo.hadoop.mapreduce.RangeInputSplit split) { + org.apache.accumulo.hadoopImpl.mapreduce.RangeInputSplit split) { List iterators = null; if (null == split) { @@ -478,7 +279,7 @@ public abstract class AbstractInputFormat implements InputFormat { * Initialize a scanner over the given input split using this task attempt configuration. */ public void initialize(InputSplit inSplit, JobConf job) throws IOException { - baseSplit = (org.apache.accumulo.hadoop.mapreduce.RangeInputSplit) inSplit; + baseSplit = (org.apache.accumulo.hadoopImpl.mapreduce.RangeInputSplit) inSplit; log.debug("Initializing input split: " + baseSplit); ClientContext context = new ClientContext(getClientInfo(job)); @@ -566,7 +367,7 @@ public abstract class AbstractInputFormat implements InputFormat { throw new IllegalArgumentException("Can not initialize from " + baseSplit.getClass()); } - Collection> columns = baseSplit.getFetchedColumns(); + Collection columns = baseSplit.getFetchedColumns(); if (null == columns) { columns = tableConfig.getFetchedColumns(); } @@ -627,8 +428,8 @@ public abstract class AbstractInputFormat implements InputFormat { } - Map>> binOfflineTable(JobConf job, Table.ID tableId, - List ranges) + public static Map>> binOfflineTable(JobConf job, + Table.ID tableId, List ranges) throws TableNotFoundException, AccumuloException, AccumuloSecurityException { ClientContext context = new ClientContext(getClientInfo(job)); return InputConfigurator.binOffline(tableId, ranges, context); @@ -643,10 +444,7 @@ public abstract class AbstractInputFormat implements InputFormat { * if a table set on the job doesn't exist or an error occurs initializing the tablet * locator */ - @Override - public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { - Level logLevel = getLogLevel(job); - log.setLevel(logLevel); + public static InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { validateOptions(job); Random random = new SecureRandom(); @@ -743,7 +541,7 @@ public abstract class AbstractInputFormat implements InputFormat { BatchInputSplit split = new BatchInputSplit(tableName, tableId, clippedRanges, new String[] {location}); - SplitUtils.updateSplit(split, tableConfig, logLevel); + SplitUtils.updateSplit(split, tableConfig); splits.add(split); } else { @@ -753,7 +551,7 @@ public abstract class AbstractInputFormat implements InputFormat { // divide ranges into smaller ranges, based on the tablets RangeInputSplit split = new RangeInputSplit(tableName, tableId.canonicalID(), ke.clip(r), new String[] {location}); - SplitUtils.updateSplit(split, tableConfig, logLevel); + SplitUtils.updateSplit(split, tableConfig); split.setOffline(tableConfig.isOfflineScan()); split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners()); split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators()); @@ -776,7 +574,7 @@ public abstract class AbstractInputFormat implements InputFormat { for (Map.Entry> entry : splitsToAdd.entrySet()) { RangeInputSplit split = new RangeInputSplit(tableName, tableId.canonicalID(), entry.getKey(), entry.getValue().toArray(new String[0])); - SplitUtils.updateSplit(split, tableConfig, logLevel); + SplitUtils.updateSplit(split, tableConfig); split.setOffline(tableConfig.isOfflineScan()); split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners()); split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators()); diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloFileOutputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloFileOutputFormatImpl.java similarity index 62% copy from hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloFileOutputFormat.java copy to hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloFileOutputFormatImpl.java index a4664d1..42ac709 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloFileOutputFormat.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloFileOutputFormatImpl.java @@ -14,48 +14,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.accumulo.hadoop.mapred; +package org.apache.accumulo.hadoopImpl.mapred; -import java.io.IOException; - -import org.apache.accumulo.core.client.rfile.RFile; -import org.apache.accumulo.core.client.rfile.RFileWriter; import org.apache.accumulo.core.client.sample.SamplerConfiguration; import org.apache.accumulo.core.client.summary.Summarizer; import org.apache.accumulo.core.client.summary.SummarizerConfiguration; -import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.hadoopImpl.mapreduce.lib.ConfiguratorBase; +import org.apache.accumulo.hadoop.mapred.AccumuloFileOutputFormat; import org.apache.accumulo.hadoopImpl.mapreduce.lib.FileOutputConfigurator; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordWriter; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.util.Progressable; -import org.apache.log4j.Logger; -/** - * This class allows MapReduce jobs to write output in the Accumulo data file format.
    - * Care should be taken to write only sorted data (sorted by {@link Key}), as this is an important - * requirement of Accumulo data files. - * - *

    - * The output path to be created must be specified via - * {@link AccumuloFileOutputFormat#setOutputPath(JobConf, Path)}. This is inherited from - * {@link FileOutputFormat#setOutputPath(JobConf, Path)}. Other methods from - * {@link FileOutputFormat} are not supported and may be ignored or cause failures. Using other - * Hadoop configuration options that affect the behavior of the underlying files directly in the - * Job's configuration may work, but are not directly supported at this time. - */ -public class AccumuloFileOutputFormat extends FileOutputFormat { +public class AccumuloFileOutputFormatImpl { private static final Class CLASS = AccumuloFileOutputFormat.class; - protected static final Logger log = Logger.getLogger(CLASS); /** * Sets the compression type to use for data blocks. Specifying a compression may require @@ -162,38 +132,4 @@ public class AccumuloFileOutputFormat extends FileOutputFormat { FileOutputConfigurator.setSummarizers(CLASS, job, summarizerConfigs); } - @Override - public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name, - Progressable progress) throws IOException { - // get the path of the temporary output file - final Configuration conf = job; - final AccumuloConfiguration acuConf = FileOutputConfigurator.getAccumuloConfiguration(CLASS, - job); - - final String extension = acuConf.get(Property.TABLE_FILE_TYPE); - final Path file = new Path(getWorkOutputPath(job), - getUniqueName(job, "part") + "." + extension); - final int visCacheSize = ConfiguratorBase.getVisibilityCacheSize(conf); - - return new RecordWriter() { - RFileWriter out = null; - - @Override - public void close(Reporter reporter) throws IOException { - if (out != null) - out.close(); - } - - @Override - public void write(Key key, Value value) throws IOException { - if (out == null) { - out = RFile.newWriter().to(file.toString()).withFileSystem(file.getFileSystem(conf)) - .withTableProperties(acuConf).withVisibilityCacheSize(visCacheSize).build(); - out.startDefaultLocalityGroup(); - } - out.append(key, value); - } - }; - } - } diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloOutputFormatImpl.java similarity index 57% copy from hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormat.java copy to hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloOutputFormatImpl.java index da8fa1b..7e8f4d8 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormat.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloOutputFormatImpl.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.accumulo.hadoop.mapred; +package org.apache.accumulo.hadoopImpl.mapred; import java.io.IOException; import java.util.HashMap; @@ -33,30 +33,20 @@ import org.apache.accumulo.core.client.MultiTableBatchWriter; import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.admin.DelegationTokenConfig; -import org.apache.accumulo.core.client.admin.SecurityOperations; -import org.apache.accumulo.core.client.impl.AuthenticationTokenIdentifier; -import org.apache.accumulo.core.client.impl.DelegationTokenImpl; import org.apache.accumulo.core.client.security.SecurityErrorCode; -import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; -import org.apache.accumulo.core.client.security.tokens.DelegationToken; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.data.ColumnUpdate; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.TabletId; import org.apache.accumulo.core.security.ColumnVisibility; -import org.apache.accumulo.hadoopImpl.mapreduce.lib.ConfiguratorBase; +import org.apache.accumulo.hadoop.mapred.AccumuloOutputFormat; import org.apache.accumulo.hadoopImpl.mapreduce.lib.OutputConfigurator; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.Progressable; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class allows MapReduce jobs to use Accumulo as the sink for data. This {@link OutputFormat} @@ -66,15 +56,15 @@ import org.apache.log4j.Logger; * The user must specify the following via static configurator methods: * *

      - *
    • {@link AccumuloOutputFormat#setClientInfo(JobConf, ClientInfo)} + *
    • {@link AccumuloOutputFormatImpl#setClientInfo(JobConf, ClientInfo)} *
    * * Other static methods are optional. */ -public class AccumuloOutputFormat implements OutputFormat { +public class AccumuloOutputFormatImpl { private static final Class CLASS = AccumuloOutputFormat.class; - protected static final Logger log = Logger.getLogger(CLASS); + private static final Logger log = LoggerFactory.getLogger(CLASS); /** * Set the connection information needed to communicate with Accumulo in this job. @@ -97,7 +87,7 @@ public class AccumuloOutputFormat implements OutputFormat { * Hadoop job to be configured * @since 2.0.0 */ - protected static ClientInfo getClientInfo(JobConf job) { + public static ClientInfo getClientInfo(JobConf job) { return OutputConfigurator.getClientInfo(CLASS, job); } @@ -110,176 +100,11 @@ public class AccumuloOutputFormat implements OutputFormat { * URL (hdfs:// or http://) to Accumulo client properties file * @since 2.0.0 */ - public static void setClientPropertiesFile(JobConf job, String clientPropsFile) { + protected static void setClientPropertiesFile(JobConf job, String clientPropsFile) { OutputConfigurator.setClientPropertiesFile(CLASS, job, clientPropsFile); } /** - * Sets the connector information needed to communicate with Accumulo in this job. - * - *

    - * WARNING: Some tokens, when serialized, divulge sensitive information in the - * configuration as a means to pass the token to MapReduce tasks. This information is BASE64 - * encoded to provide a charset safe conversion to a string, but this conversion is not intended - * to be secure. {@link PasswordToken} is one example that is insecure in this way; however - * {@link DelegationToken}s, acquired using - * {@link SecurityOperations#getDelegationToken(DelegationTokenConfig)}, is not subject to this - * concern. - * - * @param job - * the Hadoop job instance to be configured - * @param principal - * a valid Accumulo user name (user must have Table.CREATE permission if - * {@link #setCreateTables(JobConf, boolean)} is set to true) - * @param token - * the user's password - * @since 1.5.0 - * @deprecated since 2.0.0, use {@link #setClientInfo(JobConf, ClientInfo)} instead. - */ - @Deprecated - public static void setConnectorInfo(JobConf job, String principal, AuthenticationToken token) - throws AccumuloSecurityException { - // DelegationTokens can be passed securely from user to task without serializing insecurely in - // the configuration - if (token instanceof DelegationTokenImpl) { - DelegationTokenImpl delegationToken = (DelegationTokenImpl) token; - - // Convert it into a Hadoop Token - AuthenticationTokenIdentifier identifier = delegationToken.getIdentifier(); - Token hadoopToken = new Token<>(identifier.getBytes(), - delegationToken.getPassword(), identifier.getKind(), delegationToken.getServiceName()); - - // Add the Hadoop Token to the Job so it gets serialized and passed along. - job.getCredentials().addToken(hadoopToken.getService(), hadoopToken); - } - - OutputConfigurator.setConnectorInfo(CLASS, job, principal, token); - } - - /** - * Sets the connector information needed to communicate with Accumulo in this job. - * - *

    - * Stores the password in a file in HDFS and pulls that into the Distributed Cache in an attempt - * to be more secure than storing it in the Configuration. - * - * @param job - * the Hadoop job instance to be configured - * @param principal - * a valid Accumulo user name (user must have Table.CREATE permission if - * {@link #setCreateTables(JobConf, boolean)} is set to true) - * @param tokenFile - * the path to the password file - * @since 1.6.0 - * @deprecated since 2.0.0, use {@link #setClientPropertiesFile(JobConf, String)} instead - */ - @Deprecated - public static void setConnectorInfo(JobConf job, String principal, String tokenFile) - throws AccumuloSecurityException { - setClientPropertiesFile(job, tokenFile); - } - - /** - * Determines if the connector has been configured. - * - * @param job - * the Hadoop context for the configured job - * @return true if the connector has been configured, false otherwise - * @since 1.5.0 - * @see #setConnectorInfo(JobConf, String, AuthenticationToken) - */ - protected static Boolean isConnectorInfoSet(JobConf job) { - return OutputConfigurator.isConnectorInfoSet(CLASS, job); - } - - /** - * Gets the principal from the configuration. - * - * @param job - * the Hadoop context for the configured job - * @return the user name - * @since 1.5.0 - * @see #setConnectorInfo(JobConf, String, AuthenticationToken) - */ - protected static String getPrincipal(JobConf job) { - return OutputConfigurator.getPrincipal(CLASS, job); - } - - /** - * Gets the authenticated token from either the specified token file or directly from the - * configuration, whichever was used when the job was configured. - * - * @param job - * the Hadoop job instance to be configured - * @return the principal's authentication token - * @since 1.6.0 - * @see #setConnectorInfo(JobConf, String, AuthenticationToken) - * @see #setConnectorInfo(JobConf, String, String) - */ - protected static AuthenticationToken getAuthenticationToken(JobConf job) { - AuthenticationToken token = OutputConfigurator.getAuthenticationToken(CLASS, job); - return ConfiguratorBase.unwrapAuthenticationToken(job, token); - } - - /** - * Configures a {@link org.apache.accumulo.core.client.ZooKeeperInstance} for this job. - * - * @param job - * the Hadoop job instance to be configured - * - * @param clientConfig - * client configuration for specifying connection timeouts, SSL connection options, etc. - * @since 1.6.0 - * @deprecated since 2.0.0; Use {@link #setClientInfo(JobConf, ClientInfo)} instead. - */ - @Deprecated - public static void setZooKeeperInstance(JobConf job, - org.apache.accumulo.core.client.ClientConfiguration clientConfig) { - OutputConfigurator.setZooKeeperInstance(CLASS, job, clientConfig); - } - - /** - * Initializes an Accumulo {@link org.apache.accumulo.core.client.Instance} based on the - * configuration. - * - * @param job - * the Hadoop context for the configured job - * @return an Accumulo instance - * @since 1.5.0 - * @deprecated since 2.0.0; Use {@link #getClientInfo(JobConf)} instead - */ - @Deprecated - protected static org.apache.accumulo.core.client.Instance getInstance(JobConf job) { - return OutputConfigurator.getInstance(CLASS, job); - } - - /** - * Sets the log level for this job. - * - * @param job - * the Hadoop job instance to be configured - * @param level - * the logging level - * @since 1.5.0 - */ - public static void setLogLevel(JobConf job, Level level) { - OutputConfigurator.setLogLevel(CLASS, job, level); - } - - /** - * Gets the log level from this configuration. - * - * @param job - * the Hadoop context for the configured job - * @return the log level - * @since 1.5.0 - * @see #setLogLevel(JobConf, Level) - */ - protected static Level getLogLevel(JobConf job) { - return OutputConfigurator.getLogLevel(CLASS, job); - } - - /** * Sets the default table name to use if one emits a null in place of a table name for a given * mutation. Table names can only be alpha-numeric and underscores. * @@ -330,7 +155,7 @@ public class AccumuloOutputFormat implements OutputFormat { * @since 1.5.0 * @see #setBatchWriterOptions(JobConf, BatchWriterConfig) */ - protected static BatchWriterConfig getBatchWriterOptions(JobConf job) { + public static BatchWriterConfig getBatchWriterOptions(JobConf job) { return OutputConfigurator.getBatchWriterOptions(CLASS, job); } @@ -397,7 +222,7 @@ public class AccumuloOutputFormat implements OutputFormat { /** * A base class to be used to create {@link RecordWriter} instances that write to Accumulo. */ - protected static class AccumuloRecordWriter implements RecordWriter { + public static class AccumuloRecordWriter implements RecordWriter { private MultiTableBatchWriter mtbw = null; private HashMap bws = null; private Text defaultTableName = null; @@ -410,11 +235,7 @@ public class AccumuloOutputFormat implements OutputFormat { private AccumuloClient client; - protected AccumuloRecordWriter(JobConf job) - throws AccumuloException, AccumuloSecurityException, IOException { - Level l = getLogLevel(job); - if (l != null) - log.setLevel(getLogLevel(job)); + public AccumuloRecordWriter(JobConf job) throws AccumuloException, AccumuloSecurityException { this.simulate = getSimulationMode(job); this.createTables = canCreateTables(job); @@ -434,8 +255,8 @@ public class AccumuloOutputFormat implements OutputFormat { /** * Push a mutation into a table. If table is null, the defaultTable will be used. If - * {@link AccumuloOutputFormat#canCreateTables(JobConf)} is set, the table will be created if it - * does not exist. The table name must only contain alphanumerics and underscore. + * {@link AccumuloOutputFormatImpl#canCreateTables(JobConf)} is set, the table will be created + * if it does not exist. The table name must only contain alphanumerics and underscore. */ @Override public void write(Text table, Mutation mutation) throws IOException { @@ -467,7 +288,7 @@ public class AccumuloOutputFormat implements OutputFormat { } } - public void addTable(Text tableName) throws AccumuloException, AccumuloSecurityException { + protected void addTable(Text tableName) throws AccumuloException, AccumuloSecurityException { if (simulate) { log.info("Simulating adding table: " + tableName); return; @@ -558,30 +379,4 @@ public class AccumuloOutputFormat implements OutputFormat { } } - @Override - public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException { - if (!isConnectorInfoSet(job)) - throw new IOException("Connector info has not been set."); - try { - // if the instance isn't configured, it will complain here - AccumuloClient c = Accumulo.newClient().usingClientInfo(getClientInfo(job)).build(); - String principal = getPrincipal(job); - AuthenticationToken token = getAuthenticationToken(job); - if (!c.securityOperations().authenticateUser(principal, token)) - throw new IOException("Unable to authenticate user"); - } catch (AccumuloException | AccumuloSecurityException e) { - throw new IOException(e); - } - } - - @Override - public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name, - Progressable progress) throws IOException { - try { - return new AccumuloRecordWriter(job); - } catch (Exception e) { - throw new IOException(e); - } - } - } diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/InputFormatBase.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/InputFormatBase.java similarity index 82% rename from hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/InputFormatBase.java rename to hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/InputFormatBase.java index 677049e..df85117 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/InputFormatBase.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/InputFormatBase.java @@ -14,13 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.accumulo.hadoop.mapred; +package org.apache.accumulo.hadoopImpl.mapred; import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.accumulo.core.client.ClientSideIteratorScanner; import org.apache.accumulo.core.client.IsolatedScanner; @@ -28,32 +27,11 @@ import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.ScannerBase; import org.apache.accumulo.core.client.admin.TableOperations; import org.apache.accumulo.core.client.sample.SamplerConfiguration; -import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; -/** - * This abstract {@link InputFormat} class allows MapReduce jobs to use Accumulo as the source of - * K,V pairs. - *

    - * Subclasses must implement a {@link #getRecordReader(InputSplit, JobConf, Reporter)} to provide a - * {@link RecordReader} for K,V. - *

    - * A static base class, RecordReaderBase, is provided to retrieve Accumulo {@link Key}/{@link Value} - * pairs, but one must implement its {@link RecordReaderBase#next(Object, Object)} to transform them - * to the desired generic types K,V. - *

    - * See {@link AccumuloInputFormat} for an example implementation. - */ -public abstract class InputFormatBase extends AbstractInputFormat { +public abstract class InputFormatBase extends AbstractInputFormat { /** * Sets the name of the input table, over which this job will scan. @@ -112,55 +90,15 @@ public abstract class InputFormatBase extends AbstractInputFormat { /** * Restricts the columns that will be mapped over for this job. - * - * @param job - * the Hadoop job instance to be configured - * @param columnFamilyColumnQualifierPairs - * a pair of {@link Text} objects corresponding to column family and column qualifier. If - * the column qualifier is null, the entire column family is selected. An empty set is - * the default and is equivalent to scanning the all columns. - * @since 1.5.0 */ public static void fetchColumns(JobConf job, - Collection> columnFamilyColumnQualifierPairs) { + Collection columnFamilyColumnQualifierPairs) { InputConfigurator.fetchColumns(CLASS, job, columnFamilyColumnQualifierPairs); } /** - * Gets the columns to be mapped over from this job. - * - * @param job - * the Hadoop context for the configured job - * @return a set of columns - * @since 1.5.0 - * @see #fetchColumns(JobConf, Collection) - */ - protected static Set> getFetchedColumns(JobConf job) { - return InputConfigurator.getFetchedColumns(CLASS, job); - } - - /** - * Encode an iterator on the input for this job. - * - * @param job - * the Hadoop job instance to be configured - * @param cfg - * the configuration of the iterator - * @since 1.5.0 - */ - public static void addIterator(JobConf job, IteratorSetting cfg) { - InputConfigurator.addIterator(CLASS, job, cfg); - } - - /** * Gets a list of the iterator settings (for iterators to apply to a scanner) from this * configuration. - * - * @param job - * the Hadoop context for the configured job - * @return a list of iterators - * @since 1.5.0 - * @see #addIterator(JobConf, IteratorSetting) */ protected static List getIterators(JobConf job) { return InputConfigurator.getIterators(CLASS, job); @@ -352,7 +290,7 @@ public abstract class InputFormatBase extends AbstractInputFormat { * @since 1.7.0 * @see #setBatchScan(JobConf, boolean) */ - public static boolean isBatchScan(JobConf job) { + protected static boolean isBatchScan(JobConf job) { return InputConfigurator.isBatchScan(CLASS, job); } @@ -385,7 +323,7 @@ public abstract class InputFormatBase extends AbstractInputFormat { InputConfigurator.setExecutionHints(CLASS, job, hints); } - protected abstract static class RecordReaderBase extends AbstractRecordReader { + public abstract static class RecordReaderBase extends AbstractRecordReader { @Override protected List jobIterators(JobConf job, String tableName) { diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/RangeInputSplit.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/RangeInputSplit.java similarity index 91% rename from hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/RangeInputSplit.java rename to hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/RangeInputSplit.java index 9d163d7..d1d8a45 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/RangeInputSplit.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/RangeInputSplit.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.accumulo.hadoop.mapred; +package org.apache.accumulo.hadoopImpl.mapred; import java.io.IOException; @@ -28,7 +28,7 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; */ @SuppressFBWarnings(value = "NM_SAME_SIMPLE_NAME_AS_SUPERCLASS", justification = "Intended to share code between mapred and mapreduce") -public class RangeInputSplit extends org.apache.accumulo.hadoop.mapreduce.RangeInputSplit +public class RangeInputSplit extends org.apache.accumulo.hadoopImpl.mapreduce.RangeInputSplit implements InputSplit { public RangeInputSplit() { diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AbstractInputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AbstractInputFormat.java similarity index 69% rename from hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AbstractInputFormat.java rename to hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AbstractInputFormat.java index d0f3be1..749bec2 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AbstractInputFormat.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AbstractInputFormat.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.accumulo.hadoop.mapreduce; +package org.apache.accumulo.hadoopImpl.mapreduce; import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly; @@ -31,7 +31,6 @@ import java.util.Map; import java.util.Random; import java.util.concurrent.TimeUnit; -import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; @@ -45,21 +44,13 @@ import org.apache.accumulo.core.client.ScannerBase; import org.apache.accumulo.core.client.TableDeletedException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.TableOfflineException; -import org.apache.accumulo.core.client.admin.DelegationTokenConfig; -import org.apache.accumulo.core.client.admin.SecurityOperations; -import org.apache.accumulo.core.client.impl.AuthenticationTokenIdentifier; import org.apache.accumulo.core.client.impl.ClientContext; -import org.apache.accumulo.core.client.impl.DelegationTokenImpl; import org.apache.accumulo.core.client.impl.OfflineScanner; import org.apache.accumulo.core.client.impl.ScannerImpl; import org.apache.accumulo.core.client.impl.Table; import org.apache.accumulo.core.client.impl.Tables; import org.apache.accumulo.core.client.impl.TabletLocator; import org.apache.accumulo.core.client.sample.SamplerConfiguration; -import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; -import org.apache.accumulo.core.client.security.tokens.DelegationToken; -import org.apache.accumulo.core.client.security.tokens.KerberosToken; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; @@ -67,30 +58,26 @@ import org.apache.accumulo.core.data.impl.KeyExtent; import org.apache.accumulo.core.master.state.tables.TableState; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.Pair; -import org.apache.accumulo.hadoopImpl.mapreduce.BatchInputSplit; -import org.apache.accumulo.hadoopImpl.mapreduce.SplitUtils; -import org.apache.accumulo.hadoopImpl.mapreduce.lib.ConfiguratorBase; +import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat; import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.security.token.Token; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * An abstract input format to provide shared methods common to all other input format classes. At * the very least, any classes inheriting from this class will need to define their own * {@link RecordReader}. */ -public abstract class AbstractInputFormat extends InputFormat { +public abstract class AbstractInputFormat { protected static final Class CLASS = AccumuloInputFormat.class; - protected static final Logger log = Logger.getLogger(CLASS); + private static final Logger log = LoggerFactory.getLogger(CLASS); /** * Sets the name of the classloader context on this scanner @@ -113,7 +100,7 @@ public abstract class AbstractInputFormat extends InputFormat { * @return name of the current context * @since 1.8.0 */ - public static String getClassLoaderContext(JobContext job) { + protected static String getClassLoaderContext(JobContext job) { return InputConfigurator.getClassLoaderContext(CLASS, job.getConfiguration()); } @@ -140,7 +127,7 @@ public abstract class AbstractInputFormat extends InputFormat { * URL (hdfs:// or http://) to Accumulo client properties file * @since 2.0.0 */ - public static void setClientPropertiesFile(Job job, String clientPropsFile) { + protected static void setClientPropertiesFile(Job job, String clientPropsFile) { InputConfigurator.setClientPropertiesFile(CLASS, job.getConfiguration(), clientPropsFile); } @@ -152,186 +139,11 @@ public abstract class AbstractInputFormat extends InputFormat { * @return ClientInfo * @since 2.0.0 */ - protected static ClientInfo getClientInfo(JobContext context) { + public static ClientInfo getClientInfo(JobContext context) { return InputConfigurator.getClientInfo(CLASS, context.getConfiguration()); } /** - * Sets the connector information needed to communicate with Accumulo in this job. - * - *

    - * WARNING: Some tokens, when serialized, divulge sensitive information in the - * configuration as a means to pass the token to MapReduce tasks. This information is BASE64 - * encoded to provide a charset safe conversion to a string, but this conversion is not intended - * to be secure. {@link PasswordToken} is one example that is insecure in this way; however - * {@link DelegationToken}s, acquired using - * {@link SecurityOperations#getDelegationToken(DelegationTokenConfig)}, is not subject to this - * concern. - * - * @param job - * the Hadoop job instance to be configured - * @param principal - * a valid Accumulo user name (user must have Table.CREATE permission) - * @param token - * the user's password - * @since 1.5.0 - * @deprecated since 2.0.0; use {@link #setClientInfo(Job, ClientInfo)} instead. - */ - @Deprecated - public static void setConnectorInfo(Job job, String principal, AuthenticationToken token) - throws AccumuloSecurityException { - if (token instanceof KerberosToken) { - log.info("Received KerberosToken, attempting to fetch DelegationToken"); - try { - AccumuloClient client = Accumulo.newClient().usingClientInfo(getClientInfo(job)) - .usingToken(principal, token).build(); - token = client.securityOperations().getDelegationToken(new DelegationTokenConfig()); - } catch (Exception e) { - log.warn("Failed to automatically obtain DelegationToken, " - + "Mappers/Reducers will likely fail to communicate with Accumulo", e); - } - } - // DelegationTokens can be passed securely from user to task without serializing insecurely in - // the configuration - if (token instanceof DelegationTokenImpl) { - DelegationTokenImpl delegationToken = (DelegationTokenImpl) token; - - // Convert it into a Hadoop Token - AuthenticationTokenIdentifier identifier = delegationToken.getIdentifier(); - Token hadoopToken = new Token<>(identifier.getBytes(), - delegationToken.getPassword(), identifier.getKind(), delegationToken.getServiceName()); - - // Add the Hadoop Token to the Job so it gets serialized and passed along. - job.getCredentials().addToken(hadoopToken.getService(), hadoopToken); - } - - InputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), principal, token); - } - - /** - * Sets the connector information needed to communicate with Accumulo in this job. - * - *

    - * Stores the password in a file in HDFS and pulls that into the Distributed Cache in an attempt - * to be more secure than storing it in the Configuration. - * - * @param job - * the Hadoop job instance to be configured - * @param principal - * a valid Accumulo user name (user must have Table.CREATE permission) - * @param tokenFile - * the path to the token file - * @since 1.6.0 - * @deprecated since 2.0.0, use {@link #setClientPropertiesFile(Job, String)} - */ - @Deprecated - public static void setConnectorInfo(Job job, String principal, String tokenFile) - throws AccumuloSecurityException { - setClientPropertiesFile(job, tokenFile); - } - - /** - * Determines if the connector has been configured. - * - * @param context - * the Hadoop context for the configured job - * @return true if the connector has been configured, false otherwise - * @since 1.5.0 - * @see #setConnectorInfo(Job, String, AuthenticationToken) - */ - protected static Boolean isConnectorInfoSet(JobContext context) { - return InputConfigurator.isConnectorInfoSet(CLASS, context.getConfiguration()); - } - - /** - * Gets the user name from the configuration. - * - * @param context - * the Hadoop context for the configured job - * @return the user name - * @since 1.5.0 - * @see #setConnectorInfo(Job, String, AuthenticationToken) - */ - protected static String getPrincipal(JobContext context) { - return InputConfigurator.getPrincipal(CLASS, context.getConfiguration()); - } - - /** - * Gets the authenticated token from either the specified token file or directly from the - * configuration, whichever was used when the job was configured. - * - * @param context - * the Hadoop context for the configured job - * @return the principal's authentication token - * @since 1.6.0 - * @see #setConnectorInfo(Job, String, AuthenticationToken) - * @see #setConnectorInfo(Job, String, String) - */ - protected static AuthenticationToken getAuthenticationToken(JobContext context) { - AuthenticationToken token = InputConfigurator.getAuthenticationToken(CLASS, - context.getConfiguration()); - return ConfiguratorBase.unwrapAuthenticationToken(context, token); - } - - /** - * Configures a {@link org.apache.accumulo.core.client.ZooKeeperInstance} for this job. - * - * @param job - * the Hadoop job instance to be configured - * - * @param clientConfig - * client configuration containing connection options - * @since 1.6.0 - * @deprecated since 2.0.0; Use {@link #setClientInfo(Job, ClientInfo)} instead. - */ - @Deprecated - public static void setZooKeeperInstance(Job job, - org.apache.accumulo.core.client.ClientConfiguration clientConfig) { - InputConfigurator.setZooKeeperInstance(CLASS, job.getConfiguration(), clientConfig); - } - - /** - * Initializes an Accumulo {@link org.apache.accumulo.core.client.Instance} based on the - * configuration. - * - * @param context - * the Hadoop context for the configured job - * @return an Accumulo instance - * @since 1.5.0 - * @deprecated since 2.0.0, use {@link #getClientInfo(JobContext)} instead - */ - @Deprecated - protected static org.apache.accumulo.core.client.Instance getInstance(JobContext context) { - return InputConfigurator.getInstance(CLASS, context.getConfiguration()); - } - - /** - * Sets the log level for this job. - * - * @param job - * the Hadoop job instance to be configured - * @param level - * the logging level - * @since 1.5.0 - */ - public static void setLogLevel(Job job, Level level) { - InputConfigurator.setLogLevel(CLASS, job.getConfiguration(), level); - } - - /** - * Gets the log level from this configuration. - * - * @param context - * the Hadoop context for the configured job - * @return the log level - * @since 1.5.0 - * @see #setLogLevel(Job, Level) - */ - protected static Level getLogLevel(JobContext context) { - return InputConfigurator.getLogLevel(CLASS, context.getConfiguration()); - } - - /** * Sets the {@link org.apache.accumulo.core.security.Authorizations} used to scan. Must be a * subset of the user's authorization. Defaults to the empty set. * @@ -365,7 +177,7 @@ public abstract class AbstractInputFormat extends InputFormat { * @return the {@link InputTableConfig} objects for the job * @since 1.6.0 */ - protected static Map getInputTableConfigs(JobContext context) { + public static Map getInputTableConfigs(JobContext context) { return InputConfigurator.getInputTableConfigs(CLASS, context.getConfiguration()); } @@ -397,27 +209,12 @@ public abstract class AbstractInputFormat extends InputFormat { * if the context is improperly configured * @since 1.5.0 */ - protected static void validateOptions(JobContext context) throws IOException { + public static void validateOptions(JobContext context) throws IOException { AccumuloClient client = InputConfigurator.getClient(CLASS, context.getConfiguration()); InputConfigurator.validatePermissions(CLASS, context.getConfiguration(), client); } /** - * Construct the ClientConfiguration given the provided context. - * - * @param context - * The Job - * @return The ClientConfiguration - * @since 1.7.0 - * @deprecated since 2.0.0; use {@link #getClientInfo(JobContext)} instead - */ - @Deprecated - protected static org.apache.accumulo.core.client.ClientConfiguration getClientConfiguration( - JobContext context) { - return InputConfigurator.getClientConfiguration(CLASS, context.getConfiguration()); - } - - /** * An abstract base class to be used to create {@link org.apache.hadoop.mapreduce.RecordReader} * instances that convert from Accumulo * {@link org.apache.accumulo.core.data.Key}/{@link org.apache.accumulo.core.data.Value} pairs to @@ -574,7 +371,7 @@ public abstract class AbstractInputFormat extends InputFormat { } - Collection> columns = split.getFetchedColumns(); + Collection columns = split.getFetchedColumns(); if (null == columns) { columns = tableConfig.getFetchedColumns(); } @@ -653,26 +450,14 @@ public abstract class AbstractInputFormat extends InputFormat { } } - Map>> binOfflineTable(JobContext context, Table.ID tableId, - List ranges) + public static Map>> binOfflineTable(JobContext context, + Table.ID tableId, List ranges) throws TableNotFoundException, AccumuloException, AccumuloSecurityException { ClientContext clientContext = new ClientContext(getClientInfo(context)); return InputConfigurator.binOffline(tableId, ranges, clientContext); } - /** - * Gets the splits of the tables that have been set on the job by reading the metadata table for - * the specified ranges. - * - * @return the splits from the tables based on the ranges. - * @throws java.io.IOException - * if a table set on the job doesn't exist or an error occurs initializing the tablet - * locator - */ - @Override - public List getSplits(JobContext context) throws IOException { - Level logLevel = getLogLevel(context); - log.setLevel(logLevel); + public static List getSplits(JobContext context) throws IOException { validateOptions(context); Random random = new SecureRandom(); LinkedList splits = new LinkedList<>(); @@ -772,7 +557,7 @@ public abstract class AbstractInputFormat extends InputFormat { clippedRanges.add(ke.clip(r)); BatchInputSplit split = new BatchInputSplit(tableName, tableId, clippedRanges, new String[] {location}); - SplitUtils.updateSplit(split, tableConfig, logLevel); + SplitUtils.updateSplit(split, tableConfig); splits.add(split); } else { @@ -782,7 +567,7 @@ public abstract class AbstractInputFormat extends InputFormat { // divide ranges into smaller ranges, based on the tablets RangeInputSplit split = new RangeInputSplit(tableName, tableId.canonicalID(), ke.clip(r), new String[] {location}); - SplitUtils.updateSplit(split, tableConfig, logLevel); + SplitUtils.updateSplit(split, tableConfig); split.setOffline(tableConfig.isOfflineScan()); split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners()); split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators()); @@ -804,7 +589,7 @@ public abstract class AbstractInputFormat extends InputFormat { for (Map.Entry> entry : splitsToAdd.entrySet()) { RangeInputSplit split = new RangeInputSplit(tableName, tableId.canonicalID(), entry.getKey(), entry.getValue().toArray(new String[0])); - SplitUtils.updateSplit(split, tableConfig, logLevel); + SplitUtils.updateSplit(split, tableConfig); split.setOffline(tableConfig.isOfflineScan()); split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners()); split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators()); diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloFileOutputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloFileOutputFormatImpl.java similarity index 63% copy from hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloFileOutputFormat.java copy to hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloFileOutputFormatImpl.java index a47c5aa..a163de4 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloFileOutputFormat.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloFileOutputFormatImpl.java @@ -14,46 +14,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.accumulo.hadoop.mapreduce; +package org.apache.accumulo.hadoopImpl.mapreduce; -import java.io.IOException; - -import org.apache.accumulo.core.client.rfile.RFile; -import org.apache.accumulo.core.client.rfile.RFileWriter; import org.apache.accumulo.core.client.sample.SamplerConfiguration; import org.apache.accumulo.core.client.summary.Summarizer; import org.apache.accumulo.core.client.summary.SummarizerConfiguration; -import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.hadoopImpl.mapreduce.lib.ConfiguratorBase; +import org.apache.accumulo.hadoop.mapreduce.AccumuloFileOutputFormat; import org.apache.accumulo.hadoopImpl.mapreduce.lib.FileOutputConfigurator; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.log4j.Logger; -/** - * This class allows MapReduce jobs to write output in the Accumulo data file format.
    - * Care should be taken to write only sorted data (sorted by {@link Key}), as this is an important - * requirement of Accumulo data files. - * - *

    - * The output path to be created must be specified via - * {@link AccumuloFileOutputFormat#setOutputPath(Job, Path)}. This is inherited from - * {@link FileOutputFormat#setOutputPath(Job, Path)}. Other methods from {@link FileOutputFormat} - * are not supported and may be ignored or cause failures. Using other Hadoop configuration options - * that affect the behavior of the underlying files directly in the Job's configuration may work, - * but are not directly supported at this time. - */ -public class AccumuloFileOutputFormat extends FileOutputFormat { +public class AccumuloFileOutputFormatImpl { private static final Class CLASS = AccumuloFileOutputFormat.class; - protected static final Logger log = Logger.getLogger(CLASS); /** * Sets the compression type to use for data blocks. Specifying a compression may require @@ -159,36 +131,4 @@ public class AccumuloFileOutputFormat extends FileOutputFormat { public static void setSummarizers(Job job, SummarizerConfiguration... summarizerConfigs) { FileOutputConfigurator.setSummarizers(CLASS, job.getConfiguration(), summarizerConfigs); } - - @Override - public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException { - // get the path of the temporary output file - final Configuration conf = context.getConfiguration(); - final AccumuloConfiguration acuConf = FileOutputConfigurator.getAccumuloConfiguration(CLASS, - context.getConfiguration()); - - final String extension = acuConf.get(Property.TABLE_FILE_TYPE); - final Path file = this.getDefaultWorkFile(context, "." + extension); - final int visCacheSize = ConfiguratorBase.getVisibilityCacheSize(conf); - - return new RecordWriter() { - RFileWriter out = null; - - @Override - public void close(TaskAttemptContext context) throws IOException { - if (out != null) - out.close(); - } - - @Override - public void write(Key key, Value value) throws IOException { - if (out == null) { - out = RFile.newWriter().to(file.toString()).withFileSystem(file.getFileSystem(conf)) - .withTableProperties(acuConf).withVisibilityCacheSize(visCacheSize).build(); - out.startDefaultLocalityGroup(); - } - out.append(key, value); - } - }; - } } diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloOutputFormatImpl.java similarity index 58% copy from hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormat.java copy to hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloOutputFormatImpl.java index 6236424..54a0042 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormat.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloOutputFormatImpl.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.accumulo.hadoop.mapreduce; +package org.apache.accumulo.hadoopImpl.mapreduce; import java.io.IOException; import java.util.HashMap; @@ -33,31 +33,21 @@ import org.apache.accumulo.core.client.MultiTableBatchWriter; import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.admin.DelegationTokenConfig; -import org.apache.accumulo.core.client.admin.SecurityOperations; -import org.apache.accumulo.core.client.impl.AuthenticationTokenIdentifier; -import org.apache.accumulo.core.client.impl.DelegationTokenImpl; import org.apache.accumulo.core.client.security.SecurityErrorCode; -import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; -import org.apache.accumulo.core.client.security.tokens.DelegationToken; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.data.ColumnUpdate; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.TabletId; import org.apache.accumulo.core.security.ColumnVisibility; -import org.apache.accumulo.hadoopImpl.mapreduce.lib.ConfiguratorBase; +import org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat; import org.apache.accumulo.hadoopImpl.mapreduce.lib.OutputConfigurator; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; -import org.apache.hadoop.security.token.Token; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class allows MapReduce jobs to use Accumulo as the sink for data. This {@link OutputFormat} @@ -67,15 +57,15 @@ import org.apache.log4j.Logger; * The user must specify the following via static configurator methods: * *

      - *
    • {@link AccumuloOutputFormat#setClientInfo(Job, ClientInfo)} + *
    • {@link AccumuloOutputFormatImpl#setClientInfo(Job, ClientInfo)} *
    * * Other static methods are optional. */ -public class AccumuloOutputFormat extends OutputFormat { +public class AccumuloOutputFormatImpl { private static final Class CLASS = AccumuloOutputFormat.class; - protected static final Logger log = Logger.getLogger(CLASS); + private static final Logger log = LoggerFactory.getLogger(CLASS); /** * Set the connection information needed to communicate with Accumulo in this job. @@ -112,177 +102,11 @@ public class AccumuloOutputFormat extends OutputFormat { * URL to Accumulo client properties file * @since 2.0.0 */ - public static void setClientPropertiesFile(Job job, String clientPropsFile) { + protected static void setClientPropertiesFile(Job job, String clientPropsFile) { OutputConfigurator.setClientPropertiesFile(CLASS, job.getConfiguration(), clientPropsFile); } /** - * Sets the connector information needed to communicate with Accumulo in this job. - * - *

    - * WARNING: Some tokens, when serialized, divulge sensitive information in the - * configuration as a means to pass the token to MapReduce tasks. This information is BASE64 - * encoded to provide a charset safe conversion to a string, but this conversion is not intended - * to be secure. {@link PasswordToken} is one example that is insecure in this way; however - * {@link DelegationToken}s, acquired using - * {@link SecurityOperations#getDelegationToken(DelegationTokenConfig)}, is not subject to this - * concern. - * - * @param job - * the Hadoop job instance to be configured - * @param principal - * a valid Accumulo user name (user must have Table.CREATE permission if - * {@link #setCreateTables(Job, boolean)} is set to true) - * @param token - * the user's password - * @since 1.5.0 - * @deprecated since 2.0.0, replaced by {@link #setClientInfo(Job, ClientInfo)} - */ - @Deprecated - public static void setConnectorInfo(Job job, String principal, AuthenticationToken token) - throws AccumuloSecurityException { - // DelegationTokens can be passed securely from user to task without serializing insecurely in - // the configuration - if (token instanceof DelegationTokenImpl) { - DelegationTokenImpl delegationToken = (DelegationTokenImpl) token; - - // Convert it into a Hadoop Token - AuthenticationTokenIdentifier identifier = delegationToken.getIdentifier(); - Token hadoopToken = new Token<>(identifier.getBytes(), - delegationToken.getPassword(), identifier.getKind(), delegationToken.getServiceName()); - - // Add the Hadoop Token to the Job so it gets serialized and passed along. - job.getCredentials().addToken(hadoopToken.getService(), hadoopToken); - } - - OutputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), principal, token); - } - - /** - * Sets the connector information needed to communicate with Accumulo in this job. - * - *

    - * Stores the password in a file in HDFS and pulls that into the Distributed Cache in an attempt - * to be more secure than storing it in the Configuration. - * - * @param job - * the Hadoop job instance to be configured - * @param principal - * a valid Accumulo user name (user must have Table.CREATE permission if - * {@link #setCreateTables(Job, boolean)} is set to true) - * @param tokenFile - * the path to the token file - * @since 1.6.0 - * @deprecated since 2.0.0, replaced by {@link #setClientPropertiesFile(Job, String)} - */ - @Deprecated - public static void setConnectorInfo(Job job, String principal, String tokenFile) - throws AccumuloSecurityException { - setClientPropertiesFile(job, tokenFile); - } - - /** - * Determines if the connector has been configured. - * - * @param context - * the Hadoop context for the configured job - * @return true if the connector has been configured, false otherwise - * @since 1.5.0 - * @see #setConnectorInfo(Job, String, AuthenticationToken) - */ - protected static Boolean isConnectorInfoSet(JobContext context) { - return OutputConfigurator.isConnectorInfoSet(CLASS, context.getConfiguration()); - } - - /** - * Gets the user name from the configuration. - * - * @param context - * the Hadoop context for the configured job - * @return the user name - * @since 1.5.0 - * @see #setConnectorInfo(Job, String, AuthenticationToken) - */ - protected static String getPrincipal(JobContext context) { - return OutputConfigurator.getPrincipal(CLASS, context.getConfiguration()); - } - - /** - * Gets the authenticated token from either the specified token file or directly from the - * configuration, whichever was used when the job was configured. - * - * @param context - * the Hadoop context for the configured job - * @return the principal's authentication token - * @since 1.6.0 - * @see #setConnectorInfo(Job, String, AuthenticationToken) - * @see #setConnectorInfo(Job, String, String) - */ - protected static AuthenticationToken getAuthenticationToken(JobContext context) { - AuthenticationToken token = OutputConfigurator.getAuthenticationToken(CLASS, - context.getConfiguration()); - return ConfiguratorBase.unwrapAuthenticationToken(context, token); - } - - /** - * Configures a {@link org.apache.accumulo.core.client.ZooKeeperInstance} for this job. - * - * @param job - * the Hadoop job instance to be configured - * - * @param clientConfig - * client configuration for specifying connection timeouts, SSL connection options, etc. - * @since 1.6.0 - * @deprecated since 2.0.0; Use {@link #setClientInfo(Job, ClientInfo)} instead. - */ - @Deprecated - public static void setZooKeeperInstance(Job job, - org.apache.accumulo.core.client.ClientConfiguration clientConfig) { - OutputConfigurator.setZooKeeperInstance(CLASS, job.getConfiguration(), clientConfig); - } - - /** - * Initializes an Accumulo {@link org.apache.accumulo.core.client.Instance} based on the - * configuration. - * - * @param context - * the Hadoop context for the configured job - * @return an Accumulo instance - * @since 1.5.0 - * @deprecated since 2.0.0; Use {@link #getClientInfo(JobContext)} instead. - */ - @Deprecated - protected static org.apache.accumulo.core.client.Instance getInstance(JobContext context) { - return OutputConfigurator.getInstance(CLASS, context.getConfiguration()); - } - - /** - * Sets the log level for this job. - * - * @param job - * the Hadoop job instance to be configured - * @param level - * the logging level - * @since 1.5.0 - */ - public static void setLogLevel(Job job, Level level) { - OutputConfigurator.setLogLevel(CLASS, job.getConfiguration(), level); - } - - /** - * Gets the log level from this configuration. - * - * @param context - * the Hadoop context for the configured job - * @return the log level - * @since 1.5.0 - * @see #setLogLevel(Job, Level) - */ - protected static Level getLogLevel(JobContext context) { - return OutputConfigurator.getLogLevel(CLASS, context.getConfiguration()); - } - - /** * Sets the default table name to use if one emits a null in place of a table name for a given * mutation. Table names can only be alpha-numeric and underscores. * @@ -333,7 +157,7 @@ public class AccumuloOutputFormat extends OutputFormat { * @since 1.5.0 * @see #setBatchWriterOptions(Job, BatchWriterConfig) */ - protected static BatchWriterConfig getBatchWriterOptions(JobContext context) { + public static BatchWriterConfig getBatchWriterOptions(JobContext context) { return OutputConfigurator.getBatchWriterOptions(CLASS, context.getConfiguration()); } @@ -400,7 +224,7 @@ public class AccumuloOutputFormat extends OutputFormat { /** * A base class to be used to create {@link RecordWriter} instances that write to Accumulo. */ - protected static class AccumuloRecordWriter extends RecordWriter { + public static class AccumuloRecordWriter extends RecordWriter { private MultiTableBatchWriter mtbw = null; private HashMap bws = null; private Text defaultTableName = null; @@ -413,11 +237,8 @@ public class AccumuloOutputFormat extends OutputFormat { private AccumuloClient client; - protected AccumuloRecordWriter(TaskAttemptContext context) - throws AccumuloException, AccumuloSecurityException, IOException { - Level l = getLogLevel(context); - if (l != null) - log.setLevel(getLogLevel(context)); + public AccumuloRecordWriter(TaskAttemptContext context) + throws AccumuloException, AccumuloSecurityException { this.simulate = getSimulationMode(context); this.createTables = canCreateTables(context); @@ -437,8 +258,8 @@ public class AccumuloOutputFormat extends OutputFormat { /** * Push a mutation into a table. If table is null, the defaultTable will be used. If - * {@link AccumuloOutputFormat#canCreateTables(JobContext)} is set, the table will be created if - * it does not exist. The table name must only contain alphanumerics and underscore. + * {@link AccumuloOutputFormatImpl#canCreateTables(JobContext)} is set, the table will be + * created if it does not exist. The table name must only contain alphanumerics and underscore. */ @Override public void write(Text table, Mutation mutation) throws IOException { @@ -561,35 +382,4 @@ public class AccumuloOutputFormat extends OutputFormat { } } - @Override - public void checkOutputSpecs(JobContext job) throws IOException { - if (!isConnectorInfoSet(job)) - throw new IOException("Connector info has not been set."); - try { - // if the instance isn't configured, it will complain here - String principal = getPrincipal(job); - AuthenticationToken token = getAuthenticationToken(job); - AccumuloClient c = Accumulo.newClient().usingClientInfo(getClientInfo(job)).build(); - if (!c.securityOperations().authenticateUser(principal, token)) - throw new IOException("Unable to authenticate user"); - } catch (AccumuloException | AccumuloSecurityException e) { - throw new IOException(e); - } - } - - @Override - public OutputCommitter getOutputCommitter(TaskAttemptContext context) { - return new NullOutputFormat().getOutputCommitter(context); - } - - @Override - public RecordWriter getRecordWriter(TaskAttemptContext attempt) - throws IOException { - try { - return new AccumuloRecordWriter(attempt); - } catch (Exception e) { - throw new IOException(e); - } - } - } diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/BatchInputSplit.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/BatchInputSplit.java index 40b6326..77fb5f0 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/BatchInputSplit.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/BatchInputSplit.java @@ -28,7 +28,6 @@ import org.apache.accumulo.core.client.impl.Table; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.PartialKey; import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.hadoop.mapreduce.RangeInputSplit; /** * The Class BatchInputSplit. Encapsulates a set of Accumulo ranges on a single tablet for use in diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/FileOutputInfoImpl.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/FileOutputInfoImpl.java new file mode 100644 index 0000000..64d300e --- /dev/null +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/FileOutputInfoImpl.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.hadoopImpl.mapreduce; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Objects; +import java.util.Optional; + +import org.apache.accumulo.core.client.sample.SamplerConfiguration; +import org.apache.accumulo.core.client.summary.SummarizerConfiguration; +import org.apache.accumulo.hadoop.mapreduce.FileOutputInfo; +import org.apache.hadoop.fs.Path; + +public class FileOutputInfoImpl implements FileOutputInfo { + Path outputPath; + Optional comp; + Optional dataBlockSize; + Optional fileBlockSize; + Optional indexBlockSize; + Optional replication; + Optional sampler; + Collection summarizers; + + public FileOutputInfoImpl(Path outputPath, Optional comp, Optional dataBlockSize, + Optional fileBlockSize, Optional indexBlockSize, Optional replication, + Optional sampler, Collection summarizers) { + this.outputPath = outputPath; + this.comp = comp; + this.dataBlockSize = dataBlockSize; + this.fileBlockSize = fileBlockSize; + this.indexBlockSize = indexBlockSize; + this.replication = replication; + this.sampler = sampler; + this.summarizers = summarizers; + } + + @Override + public Path getOutputPath() { + return outputPath; + } + + @Override + public Optional getCompressionType() { + return comp; + } + + @Override + public Optional getDataBlockSize() { + return dataBlockSize; + } + + @Override + public Optional getFileBlockSize() { + return fileBlockSize; + } + + @Override + public Optional getIndexBlockSize() { + return indexBlockSize; + } + + @Override + public Optional getReplication() { + return replication; + } + + @Override + public Optional getSampler() { + return sampler; + } + + @Override + public Collection getSummarizers() { + return summarizers; + } + + public static class FileOutputInfoBuilderImpl implements FileOutputInfoBuilder, + FileOutputInfoBuilder.PathParams, FileOutputInfoBuilder.OutputOptions { + Path outputPath; + Optional comp = Optional.empty(); + Optional dataBlockSize = Optional.empty(); + Optional fileBlockSize = Optional.empty(); + Optional indexBlockSize = Optional.empty(); + Optional replication = Optional.empty(); + Optional sampler = Optional.empty(); + Collection summarizers = Collections.emptySet(); + + @Override + public OutputOptions outputPath(Path path) { + this.outputPath = Objects.requireNonNull(path); + ; + return this; + } + + @Override + public OutputOptions compressionType(String compressionType) { + this.comp = Optional.of(compressionType); + return this; + } + + @Override + public OutputOptions dataBlockSize(long dataBlockSize) { + this.dataBlockSize = Optional.of(dataBlockSize); + return this; + } + + @Override + public OutputOptions fileBlockSize(long fileBlockSize) { + this.fileBlockSize = Optional.of(fileBlockSize); + return this; + } + + @Override + public OutputOptions indexBlockSize(long indexBlockSize) { + this.indexBlockSize = Optional.of(indexBlockSize); + return this; + } + + @Override + public OutputOptions replication(int replication) { + this.replication = Optional.of(replication); + return this; + } + + @Override + public OutputOptions sampler(SamplerConfiguration samplerConfig) { + this.sampler = Optional.of(samplerConfig); + return this; + } + + @Override + public OutputOptions summarizers(SummarizerConfiguration... summarizerConfigs) { + this.summarizers = Arrays.asList(Objects.requireNonNull(summarizerConfigs)); + return this; + } + + @Override + public FileOutputInfo build() { + return new FileOutputInfoImpl(outputPath, comp, dataBlockSize, fileBlockSize, indexBlockSize, + replication, sampler, summarizers); + } + } +} diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/InputFormatBase.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputFormatBase.java similarity index 80% rename from hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/InputFormatBase.java rename to hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputFormatBase.java index 868b81e..ae9d0bd 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/InputFormatBase.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputFormatBase.java @@ -14,13 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.accumulo.hadoop.mapreduce; +package org.apache.accumulo.hadoopImpl.mapreduce; import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.accumulo.core.client.ClientSideIteratorScanner; import org.apache.accumulo.core.client.IsolatedScanner; @@ -28,33 +27,13 @@ import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.ScannerBase; import org.apache.accumulo.core.client.admin.TableOperations; import org.apache.accumulo.core.client.sample.SamplerConfiguration; -import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; -/** - * This abstract {@link InputFormat} class allows MapReduce jobs to use Accumulo as the source of - * K,V pairs. - *

    - * Subclasses must implement a {@link #createRecordReader(InputSplit, TaskAttemptContext)} to - * provide a {@link RecordReader} for K,V. - *

    - * A static base class, RecordReaderBase, is provided to retrieve Accumulo {@link Key}/{@link Value} - * pairs, but one must implement its {@link RecordReaderBase#nextKeyValue()} to transform them to - * the desired generic types K,V. - *

    - * See {@link AccumuloInputFormat} for an example implementation. - */ -public abstract class InputFormatBase extends AbstractInputFormat { +public abstract class InputFormatBase extends AbstractInputFormat { /** * Gets the table name from the configuration. @@ -110,56 +89,8 @@ public abstract class InputFormatBase extends AbstractInputFormat { } /** - * Restricts the columns that will be mapped over for this job for the default input table. - * - * @param job - * the Hadoop job instance to be configured - * @param columnFamilyColumnQualifierPairs - * a pair of {@link Text} objects corresponding to column family and column qualifier. If - * the column qualifier is null, the entire column family is selected. An empty set is - * the default and is equivalent to scanning the all columns. - * @since 1.5.0 - */ - public static void fetchColumns(Job job, - Collection> columnFamilyColumnQualifierPairs) { - InputConfigurator.fetchColumns(CLASS, job.getConfiguration(), columnFamilyColumnQualifierPairs); - } - - /** - * Gets the columns to be mapped over from this job. - * - * @param context - * the Hadoop context for the configured job - * @return a set of columns - * @since 1.5.0 - * @see #fetchColumns(Job, Collection) - */ - protected static Set> getFetchedColumns(JobContext context) { - return InputConfigurator.getFetchedColumns(CLASS, context.getConfiguration()); - } - - /** - * Encode an iterator on the single input table for this job. - * - * @param job - * the Hadoop job instance to be configured - * @param cfg - * the configuration of the iterator - * @since 1.5.0 - */ - public static void addIterator(Job job, IteratorSetting cfg) { - InputConfigurator.addIterator(CLASS, job.getConfiguration(), cfg); - } - - /** * Gets a list of the iterator settings (for iterators to apply to a scanner) from this * configuration. - * - * @param context - * the Hadoop context for the configured job - * @return a list of iterators - * @since 1.5.0 - * @see #addIterator(Job, IteratorSetting) */ protected static List getIterators(JobContext context) { return InputConfigurator.getIterators(CLASS, context.getConfiguration()); @@ -351,7 +282,7 @@ public abstract class InputFormatBase extends AbstractInputFormat { * @since 1.7.0 * @see #setBatchScan(Job, boolean) */ - public static boolean isBatchScan(JobContext context) { + protected static boolean isBatchScan(JobContext context) { return InputConfigurator.isBatchScan(CLASS, context.getConfiguration()); } @@ -384,7 +315,7 @@ public abstract class InputFormatBase extends AbstractInputFormat { InputConfigurator.setExecutionHints(CLASS, job.getConfiguration(), hints); } - protected abstract static class RecordReaderBase extends AbstractRecordReader { + public abstract static class RecordReaderBase extends AbstractRecordReader { @Override protected List contextIterators(TaskAttemptContext context, String tableName) { diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputInfoImpl.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputInfoImpl.java new file mode 100644 index 0000000..878148a --- /dev/null +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputInfoImpl.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.hadoopImpl.mapreduce; + +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +import org.apache.accumulo.core.client.ClientInfo; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.sample.SamplerConfiguration; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.hadoop.mapreduce.InputInfo; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +public class InputInfoImpl implements InputInfo { + String tableName; + ClientInfo clientInfo; + Authorizations scanAuths; + + // optional values + Optional context; + Collection ranges; + Collection fetchColumns; + Map iterators; + Optional samplerConfig; + Map hints; + InputInfoBooleans bools; + + public InputInfoImpl(String tableName, ClientInfo clientInfo, Authorizations scanAuths, + Optional context, Collection ranges, + Collection fetchColumns, Map iterators, + Optional samplerConfig, Map hints, + InputInfoBooleans bools) { + this.tableName = tableName; + this.clientInfo = clientInfo; + this.scanAuths = scanAuths; + this.context = context; + this.ranges = ranges; + this.fetchColumns = fetchColumns; + this.iterators = iterators; + this.samplerConfig = samplerConfig; + this.hints = hints; + this.bools = bools; + } + + @Override + public String getTableName() { + return tableName; + } + + @Override + public ClientInfo getClientInfo() { + return clientInfo; + } + + public Authorizations getScanAuths() { + return scanAuths; + } + + @Override + public Optional getContext() { + return context; + } + + @Override + public Collection getRanges() { + return ranges; + } + + @Override + public Collection getFetchColumns() { + return fetchColumns; + } + + @Override + public Collection getIterators() { + return iterators.values(); + } + + @Override + public Optional getSamplerConfig() { + return samplerConfig; + } + + @Override + public Map getExecutionHints() { + return hints; + } + + @Override + public boolean isAutoAdjustRanges() { + return bools.autoAdjustRanges; + } + + @Override + public boolean isScanIsolation() { + return bools.scanIsolation; + } + + @Override + public boolean isLocalIterators() { + return bools.localIters; + } + + @Override + public boolean isOfflineScan() { + return bools.offlineScan; + } + + @Override + public boolean isBatchScan() { + return bools.batchScan; + } + + private static class InputInfoBooleans { + boolean autoAdjustRanges = true; + boolean scanIsolation = false; + boolean offlineScan = false; + boolean localIters = false; + boolean batchScan = false; + } + + public static class InputInfoBuilderImpl + implements InputInfoBuilder, InputInfoBuilder.ClientParams, InputInfoBuilder.TableParams, + InputInfoBuilder.AuthsParams, InputInfoBuilder.InputFormatOptions, + InputInfoBuilder.ScanOptions, InputInfoBuilder.BatchScanOptions { + + String tableName; + ClientInfo clientInfo; + Authorizations scanAuths; + + Optional context = Optional.empty(); + Collection ranges = Collections.emptyList(); + Collection fetchColumns = Collections.emptyList(); + Map iterators = Collections.emptyMap(); + Optional samplerConfig = Optional.empty(); + Map hints = Collections.emptyMap(); + InputInfoBooleans bools = new InputInfoBooleans(); + + @Override + public InputInfoBuilder.TableParams clientInfo(ClientInfo clientInfo) { + this.clientInfo = Objects.requireNonNull(clientInfo, "ClientInfo must not be null"); + return this; + } + + @Override + public InputInfoBuilder.AuthsParams table(String tableName) { + this.tableName = Objects.requireNonNull(tableName, "Table name must not be null"); + return this; + } + + @Override + public InputInfoBuilder.InputFormatOptions scanAuths(Authorizations auths) { + this.scanAuths = Objects.requireNonNull(auths, "Authorizations must not be null"); + return this; + } + + @Override + public InputInfoBuilder.InputFormatOptions classLoaderContext(String context) { + this.context = Optional.of(context); + return this; + } + + @Override + public InputInfoBuilder.InputFormatOptions ranges(Collection ranges) { + this.ranges = ImmutableList + .copyOf(Objects.requireNonNull(ranges, "Collection of ranges is null")); + if (this.ranges.size() == 0) + throw new IllegalArgumentException("Specified collection of ranges is empty."); + return this; + } + + @Override + public InputInfoBuilder.InputFormatOptions fetchColumns( + Collection fetchColumns) { + this.fetchColumns = ImmutableList + .copyOf(Objects.requireNonNull(fetchColumns, "Collection of fetch columns is null")); + if (this.fetchColumns.size() == 0) + throw new IllegalArgumentException("Specified collection of fetch columns is empty."); + return this; + } + + @Override + public InputInfoBuilder.InputFormatOptions addIterator(IteratorSetting cfg) { + // store iterators by name to prevent duplicates + Objects.requireNonNull(cfg, "IteratorSetting must not be null."); + if (this.iterators.size() == 0) + this.iterators = new LinkedHashMap<>(); + this.iterators.put(cfg.getName(), cfg); + return this; + } + + @Override + public InputInfoBuilder.InputFormatOptions executionHints(Map hints) { + this.hints = ImmutableMap + .copyOf(Objects.requireNonNull(hints, "Map of execution hints must not be null.")); + if (hints.size() == 0) + throw new IllegalArgumentException("Specified map of execution hints is empty."); + return this; + } + + @Override + public InputInfoBuilder.InputFormatOptions samplerConfiguration( + SamplerConfiguration samplerConfig) { + this.samplerConfig = Optional.of(samplerConfig); + return this; + } + + @Override + public InputFormatOptions disableAutoAdjustRanges() { + bools.autoAdjustRanges = false; + return this; + } + + @Override + public ScanOptions scanIsolation() { + bools.scanIsolation = true; + return this; + } + + @Override + public ScanOptions localIterators() { + bools.localIters = true; + return this; + } + + @Override + public ScanOptions offlineScan() { + bools.offlineScan = true; + return this; + } + + @Override + public BatchScanOptions batchScan() { + bools.batchScan = true; + bools.autoAdjustRanges = true; + return this; + } + + @Override + public InputInfo build() { + return new InputInfoImpl(tableName, clientInfo, scanAuths, context, ranges, fetchColumns, + iterators, samplerConfig, hints, bools); + } + } +} diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/InputTableConfig.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputTableConfig.java similarity index 97% rename from hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/InputTableConfig.java rename to hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputTableConfig.java index 0875398..c90c92f 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/InputTableConfig.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputTableConfig.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.accumulo.hadoop.mapreduce; +package org.apache.accumulo.hadoopImpl.mapreduce; import java.io.DataInput; import java.io.DataOutput; @@ -45,7 +45,7 @@ public class InputTableConfig implements Writable { private List iterators; private List ranges; - private Collection> columns; + private Collection columns; private boolean autoAdjustRanges = true; private boolean useLocalIterators = false; @@ -95,7 +95,7 @@ public class InputTableConfig implements Writable { * the default and is equivalent to scanning the all columns. * @since 1.6.0 */ - public InputTableConfig fetchColumns(Collection> columns) { + public InputTableConfig fetchColumns(Collection columns) { this.columns = columns; return this; } @@ -103,7 +103,7 @@ public class InputTableConfig implements Writable { /** * Returns the columns to be fetched for this configuration */ - public Collection> getFetchedColumns() { + public Collection getFetchedColumns() { return columns != null ? columns : new HashSet<>(); } @@ -377,11 +377,11 @@ public class InputTableConfig implements Writable { Text colFam = new Text(); colFam.readFields(dataInput); if (numPairs == 1) { - columns.add(new Pair<>(colFam, null)); + columns.add(new IteratorSetting.Column(colFam, null)); } else if (numPairs == 2) { Text colQual = new Text(); colQual.readFields(dataInput); - columns.add(new Pair<>(colFam, colQual)); + columns.add(new IteratorSetting.Column(colFam, colQual)); } } autoAdjustRanges = dataInput.readBoolean(); diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/OutputInfoImpl.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/OutputInfoImpl.java new file mode 100644 index 0000000..27c94d1 --- /dev/null +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/OutputInfoImpl.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.hadoopImpl.mapreduce; + +import java.util.Objects; +import java.util.Optional; + +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.ClientInfo; +import org.apache.accumulo.hadoop.mapreduce.OutputInfo; + +public class OutputInfoImpl implements OutputInfo { + ClientInfo clientInfo; + + // optional values + Optional defaultTableName; + Optional bwConfig; + boolean createTables; + boolean simulationMode; + + public OutputInfoImpl(ClientInfo ci, Optional defaultTableName, + Optional bwConfig, boolean createTables, boolean simulationMode) { + this.clientInfo = ci; + this.defaultTableName = defaultTableName; + this.bwConfig = bwConfig; + this.createTables = createTables; + this.simulationMode = simulationMode; + } + + @Override + public ClientInfo getClientInfo() { + return clientInfo; + } + + @Override + public Optional getBatchWriterOptions() { + return bwConfig; + } + + @Override + public Optional getDefaultTableName() { + return defaultTableName; + } + + @Override + public boolean isCreateTables() { + return createTables; + } + + @Override + public boolean isSimulationMode() { + return simulationMode; + } + + public static class OutputInfoBuilderImpl implements OutputInfoBuilder, + OutputInfoBuilder.ClientParams, OutputInfoBuilder.OutputOptions { + ClientInfo clientInfo; + + // optional values + Optional defaultTableName = Optional.empty(); + Optional bwConfig = Optional.empty(); + boolean createTables = false; + boolean simulationMode = false; + + @Override + public OutputOptions clientInfo(ClientInfo clientInfo) { + this.clientInfo = Objects.requireNonNull(clientInfo, "ClientInfo must not be null"); + return this; + } + + @Override + public OutputOptions batchWriterOptions(BatchWriterConfig bwConfig) { + this.bwConfig = Optional.of(bwConfig); + return this; + } + + @Override + public OutputOptions defaultTableName(String tableName) { + this.defaultTableName = Optional.of(tableName); + return this; + } + + @Override + public OutputOptions enableCreateTables() { + this.createTables = true; + return this; + } + + @Override + public OutputOptions enableSimulationMode() { + this.simulationMode = true; + return this; + } + + @Override + public OutputInfo build() { + return new OutputInfoImpl(clientInfo, defaultTableName, bwConfig, createTables, + simulationMode); + } + } +} diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/RangeInputSplit.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/RangeInputSplit.java similarity index 90% rename from hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/RangeInputSplit.java rename to hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/RangeInputSplit.java index a3f0010..e0751e2 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/RangeInputSplit.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/RangeInputSplit.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.accumulo.hadoop.mapreduce; +package org.apache.accumulo.hadoopImpl.mapreduce; import java.io.DataInput; import java.io.DataOutput; @@ -36,13 +36,9 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.PartialKey; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; -import org.apache.accumulo.core.util.Pair; -import org.apache.accumulo.hadoopImpl.mapreduce.SplitUtils; import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator; -import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.log4j.Level; /** * The Class RangeInputSplit. Encapsulates an Accumulo range for use in Map Reduce jobs. @@ -52,10 +48,9 @@ public class RangeInputSplit extends InputSplit implements Writable { private String[] locations; private String tableId, tableName; private Boolean offline, isolatedScan, localIterators; - private Set> fetchedColumns; + private Set fetchedColumns; private List iterators; private SamplerConfiguration samplerConfig; - private Level level; private Map executionHints; public RangeInputSplit() { @@ -72,7 +67,7 @@ public class RangeInputSplit extends InputSplit implements Writable { this.setTableId(split.getTableId()); } - protected RangeInputSplit(String table, String tableId, Range range, String[] locations) { + public RangeInputSplit(String table, String tableId, Range range, String[] locations) { this.range = range; setLocations(locations); this.tableName = table; @@ -167,10 +162,6 @@ public class RangeInputSplit extends InputSplit implements Writable { } if (in.readBoolean()) { - level = Level.toLevel(in.readInt()); - } - - if (in.readBoolean()) { samplerConfig = new SamplerConfigurationImpl(in).toSamplerConfiguration(); } @@ -224,11 +215,6 @@ public class RangeInputSplit extends InputSplit implements Writable { } } - out.writeBoolean(null != level); - if (null != level) { - out.writeInt(level.toInt()); - } - out.writeBoolean(null != samplerConfig); if (null != samplerConfig) { new SamplerConfigurationImpl(samplerConfig).write(out); @@ -293,18 +279,18 @@ public class RangeInputSplit extends InputSplit implements Writable { this.localIterators = localIterators; } - public Set> getFetchedColumns() { + public Set getFetchedColumns() { return fetchedColumns; } - public void setFetchedColumns(Collection> fetchedColumns) { + public void setFetchedColumns(Collection fetchedColumns) { this.fetchedColumns = new HashSet<>(); - for (Pair columns : fetchedColumns) { + for (IteratorSetting.Column columns : fetchedColumns) { this.fetchedColumns.add(columns); } } - public void setFetchedColumns(Set> fetchedColumns) { + public void setFetchedColumns(Set fetchedColumns) { this.fetchedColumns = fetchedColumns; } @@ -316,14 +302,6 @@ public class RangeInputSplit extends InputSplit implements Writable { this.iterators = iterators; } - public Level getLogLevel() { - return level; - } - - public void setLogLevel(Level level) { - this.level = level; - } - @Override public String toString() { StringBuilder sb = new StringBuilder(256); @@ -336,7 +314,6 @@ public class RangeInputSplit extends InputSplit implements Writable { sb.append(" localIterators: ").append(localIterators); sb.append(" fetchColumns: ").append(fetchedColumns); sb.append(" iterators: ").append(iterators); - sb.append(" logLevel: ").append(level); sb.append(" samplerConfig: ").append(samplerConfig); sb.append(" executionHints: ").append(executionHints); return sb.toString(); diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/SplitUtils.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/SplitUtils.java index 6f0c8a0..44855fb 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/SplitUtils.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/SplitUtils.java @@ -22,10 +22,7 @@ import java.math.BigInteger; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.hadoop.mapreduce.InputTableConfig; -import org.apache.accumulo.hadoop.mapreduce.RangeInputSplit; import org.apache.hadoop.io.Text; -import org.apache.log4j.Level; public class SplitUtils { @@ -33,11 +30,9 @@ public class SplitUtils { * Central place to set common split configuration not handled by split constructors. The * intention is to make it harder to miss optional setters in future refactor. */ - public static void updateSplit(RangeInputSplit split, InputTableConfig tableConfig, - Level logLevel) { + public static void updateSplit(RangeInputSplit split, InputTableConfig tableConfig) { split.setFetchedColumns(tableConfig.getFetchedColumns()); split.setIterators(tableConfig.getIterators()); - split.setLogLevel(logLevel); split.setSamplerConfiguration(tableConfig.getSamplerConfiguration()); split.setExecutionHints(tableConfig.getExecutionHints()); } diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/ConfiguratorBase.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/ConfiguratorBase.java index c26dbff..3032d27 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/ConfiguratorBase.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/ConfiguratorBase.java @@ -37,7 +37,6 @@ import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.ClientInfo; import org.apache.accumulo.core.client.admin.DelegationTokenConfig; import org.apache.accumulo.core.client.impl.AuthenticationTokenIdentifier; -import org.apache.accumulo.core.client.impl.ClientConfConverter; import org.apache.accumulo.core.client.impl.ClientInfoImpl; import org.apache.accumulo.core.client.impl.DelegationTokenImpl; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; @@ -54,15 +53,15 @@ import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.StringUtils; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * @since 1.6.0 */ public class ConfiguratorBase { - protected static final Logger log = Logger.getLogger(ConfiguratorBase.class); + private static final Logger log = LoggerFactory.getLogger(ConfiguratorBase.class); /** * Specifies that connection info was configured @@ -298,50 +297,6 @@ public class ConfiguratorBase { } /** - * Configures a {@link org.apache.accumulo.core.client.ZooKeeperInstance} for this job. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @param clientConfig - * client configuration for specifying connection timeouts, SSL connection options, etc. - * @since 1.6.0 - * @deprecated since 2.0.0; use {@link #setClientInfo(Class, Configuration, ClientInfo)} instead - */ - @Deprecated - public static void setZooKeeperInstance(Class implementingClass, Configuration conf, - org.apache.accumulo.core.client.ClientConfiguration clientConfig) { - Properties props = getClientProperties(implementingClass, conf); - Properties newProps = ClientConfConverter.toProperties(clientConfig); - for (Object keyObj : newProps.keySet()) { - String propKey = (String) keyObj; - String val = newProps.getProperty(propKey); - props.setProperty(propKey, val); - } - setClientProperties(implementingClass, conf, props); - } - - /** - * Initializes an Accumulo {@link org.apache.accumulo.core.client.Instance} based on the - * configuration. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @return an Accumulo instance - * @since 1.6.0 - * @deprecated since 2.0.0, replaced by {@link #getClientInfo(Class, Configuration)} - */ - @Deprecated - public static org.apache.accumulo.core.client.Instance getInstance(Class implementingClass, - Configuration conf) { - return org.apache.accumulo.core.client.Connector.from(getClient(implementingClass, conf)) - .getInstance(); - } - - /** * Creates an Accumulo {@link AccumuloClient} based on the configuration * * @param implementingClass @@ -360,57 +315,6 @@ public class ConfiguratorBase { } /** - * Obtain a ClientConfiguration based on the configuration. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * - * @return A ClientConfiguration - * @since 1.7.0 - * @deprecated since 2.0.0; use {@link #getClientInfo(Class, Configuration)} instead - */ - @Deprecated - public static org.apache.accumulo.core.client.ClientConfiguration getClientConfiguration( - Class implementingClass, Configuration conf) { - return ClientConfConverter.toClientConf(getClientInfo(implementingClass, conf).getProperties()); - } - - /** - * Sets the log level for this job. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @param level - * the logging level - * @since 1.6.0 - */ - public static void setLogLevel(Class implementingClass, Configuration conf, Level level) { - checkArgument(level != null, "level is null"); - Logger.getLogger(implementingClass).setLevel(level); - conf.setInt(enumToConfKey(implementingClass, GeneralOpts.LOG_LEVEL), level.toInt()); - } - - /** - * Gets the log level from this configuration. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @return the log level - * @since 1.6.0 - * @see #setLogLevel(Class, Configuration, Level) - */ - public static Level getLogLevel(Class implementingClass, Configuration conf) { - return Level.toLevel( - conf.getInt(enumToConfKey(implementingClass, GeneralOpts.LOG_LEVEL), Level.INFO.toInt())); - } - - /** * Sets the valid visibility count for this job. * * @param conf diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/FileOutputConfigurator.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/FileOutputConfigurator.java index ee26f9a..5f4dc60 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/FileOutputConfigurator.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/FileOutputConfigurator.java @@ -29,12 +29,16 @@ import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * @since 1.6.0 */ public class FileOutputConfigurator extends ConfiguratorBase { + private static final Logger log = LoggerFactory.getLogger(FileOutputConfigurator.class); + /** * Configuration keys for {@link AccumuloConfiguration}. * @@ -85,11 +89,12 @@ public class FileOutputConfigurator extends ConfiguratorBase { Property property, T value) { if (isSupportedAccumuloProperty(property)) { String val = String.valueOf(value); - if (property.getType().isValidFormat(val)) - conf.set( - enumToConfKey(implementingClass, Opts.ACCUMULO_PROPERTIES) + "." + property.getKey(), - val); - else + if (property.getType().isValidFormat(val)) { + String key = enumToConfKey(implementingClass, Opts.ACCUMULO_PROPERTIES) + "." + + property.getKey(); + log.debug("Setting accumulo property {} = {} ", key, val); + conf.set(key, val); + } else throw new IllegalArgumentException( "Value is not appropriate for property type '" + property.getType() + "'"); } else diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/InputConfigurator.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/InputConfigurator.java index 7f84729..3babfd6 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/InputConfigurator.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/InputConfigurator.java @@ -67,7 +67,7 @@ import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.TablePermission; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.TextUtil; -import org.apache.accumulo.hadoop.mapreduce.InputTableConfig; +import org.apache.accumulo.hadoopImpl.mapreduce.InputTableConfig; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; @@ -273,13 +273,7 @@ public class InputConfigurator extends ConfiguratorBase { * Gets a list of the iterator settings (for iterators to apply to a scanner) from this * configuration. * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @return a list of iterators - * @since 1.6.0 - * @see #addIterator(Class, Configuration, IteratorSetting) + * @see #writeIteratorsToConf(Class, Configuration, Collection) */ public static List getIterators(Class implementingClass, Configuration conf) { String iterators = conf.get(enumToConfKey(implementingClass, ScanOpts.ITERATORS)); @@ -306,21 +300,9 @@ public class InputConfigurator extends ConfiguratorBase { /** * Restricts the columns that will be mapped over for the single input table on this job. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @param columnFamilyColumnQualifierPairs - * a pair of {@link Text} objects corresponding to column family and column qualifier. If - * the column qualifier is null, the entire column family is selected. An empty set is - * the default and is equivalent to scanning the all columns. - * @throws IllegalArgumentException - * if the column family is null - * @since 1.6.0 */ public static void fetchColumns(Class implementingClass, Configuration conf, - Collection> columnFamilyColumnQualifierPairs) { + Collection columnFamilyColumnQualifierPairs) { checkArgument(columnFamilyColumnQualifierPairs != null, "columnFamilyColumnQualifierPairs is null"); String[] columnStrings = serializeColumns(columnFamilyColumnQualifierPairs); @@ -328,7 +310,7 @@ public class InputConfigurator extends ConfiguratorBase { } public static String[] serializeColumns( - Collection> columnFamilyColumnQualifierPairs) { + Collection columnFamilyColumnQualifierPairs) { checkArgument(columnFamilyColumnQualifierPairs != null, "columnFamilyColumnQualifierPairs is null"); ArrayList columnStrings = new ArrayList<>(columnFamilyColumnQualifierPairs.size()); @@ -349,15 +331,9 @@ public class InputConfigurator extends ConfiguratorBase { /** * Gets the columns to be mapped over from this job. * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @return a set of columns - * @since 1.6.0 * @see #fetchColumns(Class, Configuration, Collection) */ - public static Set> getFetchedColumns(Class implementingClass, + public static Set getFetchedColumns(Class implementingClass, Configuration conf) { checkArgument(conf != null, "conf is null"); String confValue = conf.get(enumToConfKey(implementingClass, ScanOpts.COLUMNS)); @@ -371,8 +347,9 @@ public class InputConfigurator extends ConfiguratorBase { return deserializeFetchedColumns(serialized); } - public static Set> deserializeFetchedColumns(Collection serialized) { - Set> columns = new HashSet<>(); + public static Set deserializeFetchedColumns( + Collection serialized) { + Set columns = new HashSet<>(); if (null == serialized) { return columns; @@ -383,47 +360,40 @@ public class InputConfigurator extends ConfiguratorBase { Text cf = new Text(idx < 0 ? Base64.getDecoder().decode(col) : Base64.getDecoder().decode(col.substring(0, idx))); Text cq = idx < 0 ? null : new Text(Base64.getDecoder().decode(col.substring(idx + 1))); - columns.add(new Pair<>(cf, cq)); + columns.add(new IteratorSetting.Column(cf, cq)); } return columns; } /** - * Encode an iterator on the input for the single input table associated with this job. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @param cfg - * the configuration of the iterator - * @throws IllegalArgumentException - * if the iterator can't be serialized into the configuration - * @since 1.6.0 + * Serialize the iterators to the hadoop configuration under one key. */ - public static void addIterator(Class implementingClass, Configuration conf, - IteratorSetting cfg) { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - String newIter; - try { - cfg.write(new DataOutputStream(baos)); - newIter = Base64.getEncoder().encodeToString(baos.toByteArray()); - baos.close(); - } catch (IOException e) { - throw new IllegalArgumentException("unable to serialize IteratorSetting"); - } - + public static void writeIteratorsToConf(Class implementingClass, Configuration conf, + Collection iterators) { String confKey = enumToConfKey(implementingClass, ScanOpts.ITERATORS); - String iterators = conf.get(confKey); - // No iterators specified yet, create a new string - if (iterators == null || iterators.isEmpty()) { - iterators = newIter; - } else { - // append the next iterator & reset - iterators = iterators.concat(StringUtils.COMMA_STR + newIter); + StringBuilder iterBuilder = new StringBuilder(); + int count = 0; + for (IteratorSetting cfg : iterators) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + String newIter; + try { + cfg.write(new DataOutputStream(baos)); + newIter = Base64.getEncoder().encodeToString(baos.toByteArray()); + baos.close(); + } catch (IOException e) { + throw new IllegalArgumentException("unable to serialize IteratorSetting"); + } + + if (count == 0) { + iterBuilder.append(newIter); + } else { + // append the next iterator & reset + iterBuilder.append(StringUtils.COMMA_STR + newIter); + } + count++; } // Store the iterators w/ the job - conf.set(confKey, iterators); + conf.set(confKey, iterBuilder.toString()); } /** @@ -812,7 +782,7 @@ public class InputConfigurator extends ConfiguratorBase { List itrs = getIterators(implementingClass, conf); if (itrs != null) queryConfig.setIterators(itrs); - Set> columns = getFetchedColumns(implementingClass, conf); + Set columns = getFetchedColumns(implementingClass, conf); if (columns != null) queryConfig.fetchColumns(columns); List ranges = null; diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnDefaultTable.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnDefaultTable.java index 02c669b..772edc1 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnDefaultTable.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnDefaultTable.java @@ -16,10 +16,11 @@ */ package org.apache.accumulo.hadoopImpl.mapreduce.lib; -import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.ClientInfo; import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat; import org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat; +import org.apache.accumulo.hadoop.mapreduce.InputInfo; +import org.apache.accumulo.hadoop.mapreduce.OutputInfo; import org.apache.hadoop.mapreduce.Job; import com.beust.jcommander.Parameter; @@ -37,16 +38,13 @@ public class MapReduceClientOnDefaultTable extends MapReduceClientOpts { } @Override - public void setAccumuloConfigs(Job job) throws AccumuloSecurityException { - super.setAccumuloConfigs(job); + public void setAccumuloConfigs(Job job) { final String tableName = getTableName(); final ClientInfo info = getClientInfo(); - AccumuloInputFormat.setClientInfo(job, info); - AccumuloInputFormat.setInputTableName(job, tableName); - AccumuloInputFormat.setScanAuthorizations(job, auths); - AccumuloOutputFormat.setClientInfo(job, info); - AccumuloOutputFormat.setCreateTables(job, true); - AccumuloOutputFormat.setDefaultTableName(job, tableName); + AccumuloInputFormat.setInfo(job, + InputInfo.builder().clientInfo(info).table(tableName).scanAuths(auths).build()); + AccumuloOutputFormat.setInfo(job, OutputInfo.builder().clientInfo(info) + .defaultTableName(tableName).enableCreateTables().build()); } } diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnRequiredTable.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnRequiredTable.java index b061ab8..e6c91db 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnRequiredTable.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnRequiredTable.java @@ -16,9 +16,11 @@ */ package org.apache.accumulo.hadoopImpl.mapreduce.lib; -import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.ClientInfo; import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat; import org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat; +import org.apache.accumulo.hadoop.mapreduce.InputInfo; +import org.apache.accumulo.hadoop.mapreduce.OutputInfo; import org.apache.hadoop.mapreduce.Job; import com.beust.jcommander.Parameter; @@ -29,12 +31,13 @@ public class MapReduceClientOnRequiredTable extends MapReduceClientOpts { private String tableName; @Override - public void setAccumuloConfigs(Job job) throws AccumuloSecurityException { - super.setAccumuloConfigs(job); - AccumuloInputFormat.setInputTableName(job, getTableName()); - AccumuloInputFormat.setScanAuthorizations(job, auths); - AccumuloOutputFormat.setCreateTables(job, true); - AccumuloOutputFormat.setDefaultTableName(job, getTableName()); + public void setAccumuloConfigs(Job job) { + final String tableName = getTableName(); + final ClientInfo info = getClientInfo(); + AccumuloInputFormat.setInfo(job, + InputInfo.builder().clientInfo(info).table(tableName).scanAuths(auths).build()); + AccumuloOutputFormat.setInfo(job, OutputInfo.builder().clientInfo(info) + .defaultTableName(tableName).enableCreateTables().build()); } public String getTableName() { diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOpts.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOpts.java index b1a76e0..ebf5d5d 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOpts.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOpts.java @@ -24,8 +24,6 @@ import org.apache.accumulo.core.client.admin.DelegationTokenConfig; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.client.security.tokens.KerberosToken; import org.apache.accumulo.core.security.SystemPermission; -import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat; -import org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; @@ -34,13 +32,10 @@ import org.slf4j.LoggerFactory; /** * Adds some MR awareness to the ClientOpts */ -public class MapReduceClientOpts extends ClientOpts { +public abstract class MapReduceClientOpts extends ClientOpts { private static final Logger log = LoggerFactory.getLogger(MapReduceClientOpts.class); - public void setAccumuloConfigs(Job job) throws AccumuloSecurityException { - AccumuloInputFormat.setClientInfo(job, this.getClientInfo()); - AccumuloOutputFormat.setClientInfo(job, this.getClientInfo()); - } + public abstract void setAccumuloConfigs(Job job) throws AccumuloSecurityException; @Override public AuthenticationToken getToken() { diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/package-info.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/package-info.java deleted file mode 100644 index 5040875..0000000 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/package-info.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * This package exists to store common helpers for configuring MapReduce jobs in a single location. - * It contains static configurator methods, stored in classes separate from the things they - * configure (typically, {@link org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat}/ - * {@link org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat} and related classes in - * compatible frameworks), rather than storing them in those InputFormats/OutputFormats, so as not - * to clutter their API with methods that don't match the conventions for that framework. These - * classes may be useful to input/output plugins for other frameworks, so they can reuse the same - * configuration options and/or serialize them into a {@link org.apache.hadoop.conf.Configuration} - * instance in a standard way. - * - *

    - * It is not expected these will change much (except when new features are added), but end users - * should not use these classes. They should use the static configurators on the - * {@link org.apache.hadoop.mapreduce.InputFormat} or - * {@link org.apache.hadoop.mapreduce.OutputFormat} they are configuring, which in turn may use - * these classes to implement their own static configurators. Once again, these classes are intended - * for internal use, but may be useful to developers of plugins for other frameworks that read/write - * to Accumulo. - * - * @since 1.6.0 - */ -package org.apache.accumulo.hadoopImpl.mapreduce.lib; diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/lib/partition/KeyRangePartitioner.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/partition/KeyRangePartitioner.java similarity index 97% rename from hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/lib/partition/KeyRangePartitioner.java rename to hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/partition/KeyRangePartitioner.java index 0739882..af36f59 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/lib/partition/KeyRangePartitioner.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/partition/KeyRangePartitioner.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.accumulo.hadoop.mapreduce.lib.partition; +package org.apache.accumulo.hadoopImpl.mapreduce.lib.partition; import org.apache.accumulo.core.data.Key; import org.apache.hadoop.conf.Configurable; diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/lib/partition/RangePartitioner.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/partition/RangePartitioner.java similarity index 98% rename from hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/lib/partition/RangePartitioner.java rename to hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/partition/RangePartitioner.java index 9cb744f..ddc3708 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/lib/partition/RangePartitioner.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/partition/RangePartitioner.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.accumulo.hadoop.mapreduce.lib.partition; +package org.apache.accumulo.hadoopImpl.mapreduce.lib.partition; import static java.nio.charset.StandardCharsets.UTF_8; diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloFileOutputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloFileOutputFormatTest.java index 16d2d8f..7e1db66 100644 --- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloFileOutputFormatTest.java +++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloFileOutputFormatTest.java @@ -31,7 +31,9 @@ import org.apache.accumulo.core.client.summary.summarizers.VisibilitySummarizer; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; +import org.apache.accumulo.hadoop.mapreduce.FileOutputInfo; import org.apache.accumulo.hadoopImpl.mapreduce.lib.FileOutputConfigurator; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.junit.Test; @@ -55,13 +57,10 @@ public class AccumuloFileOutputFormatTest { .addOption(CountingSummarizer.MAX_COUNTERS_OPT, 256).build(); JobConf job = new JobConf(); - AccumuloFileOutputFormat.setReplication(job, a); - AccumuloFileOutputFormat.setFileBlockSize(job, b); - AccumuloFileOutputFormat.setDataBlockSize(job, c); - AccumuloFileOutputFormat.setIndexBlockSize(job, d); - AccumuloFileOutputFormat.setCompressionType(job, e); - AccumuloFileOutputFormat.setSampler(job, samplerConfig); - AccumuloFileOutputFormat.setSummarizers(job, sc1, sc2); + AccumuloFileOutputFormat.setInfo(job, + FileOutputInfo.builder().outputPath(new Path("somewhere")).replication(a).fileBlockSize(b) + .dataBlockSize(c).indexBlockSize(d).compressionType(e).sampler(samplerConfig) + .summarizers(sc1, sc2).build()); AccumuloConfiguration acuconf = FileOutputConfigurator .getAccumuloConfiguration(AccumuloFileOutputFormat.class, job); @@ -90,12 +89,9 @@ public class AccumuloFileOutputFormatTest { samplerConfig.addOption("modulus", "100003"); job = new JobConf(); - AccumuloFileOutputFormat.setReplication(job, a); - AccumuloFileOutputFormat.setFileBlockSize(job, b); - AccumuloFileOutputFormat.setDataBlockSize(job, c); - AccumuloFileOutputFormat.setIndexBlockSize(job, d); - AccumuloFileOutputFormat.setCompressionType(job, e); - AccumuloFileOutputFormat.setSampler(job, samplerConfig); + AccumuloFileOutputFormat.setInfo(job, + FileOutputInfo.builder().outputPath(new Path("somewhere")).replication(a).fileBlockSize(b) + .dataBlockSize(c).indexBlockSize(d).compressionType(e).sampler(samplerConfig).build()); acuconf = FileOutputConfigurator.getAccumuloConfiguration(AccumuloFileOutputFormat.class, job); diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormatTest.java index 7b0a3da..e204e8f 100644 --- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormatTest.java +++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormatTest.java @@ -16,19 +16,34 @@ */ package org.apache.accumulo.hadoop.mapred; +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; import static org.junit.Assert.assertEquals; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; import java.util.Base64; +import java.util.HashSet; import java.util.List; +import java.util.Properties; +import java.util.Set; +import org.apache.accumulo.core.client.ClientInfo; import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.iterators.system.CountingIterator; import org.apache.accumulo.core.iterators.user.RegExFilter; +import org.apache.accumulo.core.iterators.user.VersioningIterator; import org.apache.accumulo.core.iterators.user.WholeRowIterator; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.hadoop.mapreduce.InputInfo; +import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; @@ -45,14 +60,28 @@ public class AccumuloInputFormatTest { job = new JobConf(); } + static ClientInfo clientInfo; + + @BeforeClass + public static void setupClientInfo() { + clientInfo = createMock(ClientInfo.class); + AuthenticationToken token = createMock(AuthenticationToken.class); + Properties props = createMock(Properties.class); + expect(clientInfo.getAuthenticationToken()).andReturn(token).anyTimes(); + expect(clientInfo.getProperties()).andReturn(props).anyTimes(); + replay(clientInfo); + } + /** * Check that the iterator configuration is getting stored in the Job conf correctly. */ @Test public void testSetIterator() throws IOException { - IteratorSetting is = new IteratorSetting(1, "WholeRow", - "org.apache.accumulo.core.iterators.WholeRowIterator"); - AccumuloInputFormat.addIterator(job, is); + InputInfo.InputInfoBuilder.InputFormatOptions opts = InputInfo.builder().clientInfo(clientInfo) + .table("test").scanAuths(Authorizations.EMPTY); + + IteratorSetting is = new IteratorSetting(1, "WholeRow", WholeRowIterator.class); + AccumuloInputFormat.setInfo(job, opts.addIterator(is).build()); ByteArrayOutputStream baos = new ByteArrayOutputStream(); is.write(new DataOutputStream(baos)); String iterators = job.get("AccumuloInputFormat.ScanOpts.Iterators"); @@ -60,18 +89,19 @@ public class AccumuloInputFormatTest { } @Test - public void testAddIterator() throws IOException { - AccumuloInputFormat.addIterator(job, - new IteratorSetting(1, "WholeRow", WholeRowIterator.class)); - AccumuloInputFormat.addIterator(job, new IteratorSetting(2, "Versions", - "org.apache.accumulo.core.iterators.VersioningIterator")); - IteratorSetting iter = new IteratorSetting(3, "Count", - "org.apache.accumulo.core.iterators.CountingIterator"); - iter.addOption("v1", "1"); - iter.addOption("junk", "\0omg:!\\xyzzy"); - AccumuloInputFormat.addIterator(job, iter); - - List list = AccumuloInputFormat.getIterators(job); + public void testAddIterator() { + InputInfo.InputInfoBuilder.InputFormatOptions opts = InputInfo.builder().clientInfo(clientInfo) + .table("test").scanAuths(Authorizations.EMPTY); + + IteratorSetting iter1 = new IteratorSetting(1, "WholeRow", WholeRowIterator.class); + IteratorSetting iter2 = new IteratorSetting(2, "Versions", VersioningIterator.class); + IteratorSetting iter3 = new IteratorSetting(3, "Count", CountingIterator.class); + iter3.addOption("v1", "1"); + iter3.addOption("junk", "\0omg:!\\xyzzy"); + AccumuloInputFormat.setInfo(job, + opts.addIterator(iter1).addIterator(iter2).addIterator(iter3).build()); + + List list = InputConfigurator.getIterators(AccumuloInputFormat.class, job); // Check the list size assertEquals(3, list.size()); @@ -79,21 +109,19 @@ public class AccumuloInputFormatTest { // Walk the list and make sure our settings are correct IteratorSetting setting = list.get(0); assertEquals(1, setting.getPriority()); - assertEquals("org.apache.accumulo.core.iterators.user.WholeRowIterator", - setting.getIteratorClass()); + assertEquals(WholeRowIterator.class.getName(), setting.getIteratorClass()); assertEquals("WholeRow", setting.getName()); assertEquals(0, setting.getOptions().size()); setting = list.get(1); assertEquals(2, setting.getPriority()); - assertEquals("org.apache.accumulo.core.iterators.VersioningIterator", - setting.getIteratorClass()); + assertEquals(VersioningIterator.class.getName(), setting.getIteratorClass()); assertEquals("Versions", setting.getName()); assertEquals(0, setting.getOptions().size()); setting = list.get(2); assertEquals(3, setting.getPriority()); - assertEquals("org.apache.accumulo.core.iterators.CountingIterator", setting.getIteratorClass()); + assertEquals(CountingIterator.class.getName(), setting.getIteratorClass()); assertEquals("Count", setting.getName()); assertEquals(2, setting.getOptions().size()); assertEquals("1", setting.getOptions().get("v1")); @@ -112,20 +140,23 @@ public class AccumuloInputFormatTest { public void testIteratorOptionEncoding() throws Throwable { String key = "colon:delimited:key"; String value = "comma,delimited,value"; - IteratorSetting someSetting = new IteratorSetting(1, "iterator", "Iterator.class"); - someSetting.addOption(key, value); - AccumuloInputFormat.addIterator(job, someSetting); - - List list = AccumuloInputFormat.getIterators(job); + IteratorSetting iter1 = new IteratorSetting(1, "iter1", WholeRowIterator.class); + iter1.addOption(key, value); + // also test if reusing options will create duplicate iterators + InputInfo.InputInfoBuilder.InputFormatOptions opts = InputInfo.builder().clientInfo(clientInfo) + .table("test").scanAuths(Authorizations.EMPTY); + AccumuloInputFormat.setInfo(job, opts.addIterator(iter1).build()); + + List list = InputConfigurator.getIterators(AccumuloInputFormat.class, job); assertEquals(1, list.size()); assertEquals(1, list.get(0).getOptions().size()); assertEquals(list.get(0).getOptions().get(key), value); - someSetting.addOption(key + "2", value); - someSetting.setPriority(2); - someSetting.setName("it2"); - AccumuloInputFormat.addIterator(job, someSetting); - list = AccumuloInputFormat.getIterators(job); + IteratorSetting iter2 = new IteratorSetting(1, "iter2", WholeRowIterator.class); + iter2.addOption(key, value); + iter2.addOption(key + "2", value); + AccumuloInputFormat.setInfo(job, opts.addIterator(iter1).addIterator(iter2).build()); + list = InputConfigurator.getIterators(AccumuloInputFormat.class, job); assertEquals(2, list.size()); assertEquals(1, list.get(0).getOptions().size()); assertEquals(list.get(0).getOptions().get(key), value); @@ -138,15 +169,15 @@ public class AccumuloInputFormatTest { * Test getting iterator settings for multiple iterators set */ @Test - public void testGetIteratorSettings() throws IOException { - AccumuloInputFormat.addIterator(job, - new IteratorSetting(1, "WholeRow", "org.apache.accumulo.core.iterators.WholeRowIterator")); - AccumuloInputFormat.addIterator(job, new IteratorSetting(2, "Versions", - "org.apache.accumulo.core.iterators.VersioningIterator")); - AccumuloInputFormat.addIterator(job, - new IteratorSetting(3, "Count", "org.apache.accumulo.core.iterators.CountingIterator")); + public void testGetIteratorSettings() { + IteratorSetting iter1 = new IteratorSetting(1, "WholeRow", WholeRowIterator.class.getName()); + IteratorSetting iter2 = new IteratorSetting(2, "Versions", VersioningIterator.class.getName()); + IteratorSetting iter3 = new IteratorSetting(3, "Count", CountingIterator.class.getName()); + AccumuloInputFormat.setInfo(job, + InputInfo.builder().clientInfo(clientInfo).table("test").scanAuths(Authorizations.EMPTY) + .addIterator(iter1).addIterator(iter2).addIterator(iter3).build()); - List list = AccumuloInputFormat.getIterators(job); + List list = InputConfigurator.getIterators(AccumuloInputFormat.class, job); // Check the list size assertEquals(3, list.size()); @@ -154,31 +185,45 @@ public class AccumuloInputFormatTest { // Walk the list and make sure our settings are correct IteratorSetting setting = list.get(0); assertEquals(1, setting.getPriority()); - assertEquals("org.apache.accumulo.core.iterators.WholeRowIterator", setting.getIteratorClass()); + assertEquals(WholeRowIterator.class.getName(), setting.getIteratorClass()); assertEquals("WholeRow", setting.getName()); setting = list.get(1); assertEquals(2, setting.getPriority()); - assertEquals("org.apache.accumulo.core.iterators.VersioningIterator", - setting.getIteratorClass()); + assertEquals(VersioningIterator.class.getName(), setting.getIteratorClass()); assertEquals("Versions", setting.getName()); setting = list.get(2); assertEquals(3, setting.getPriority()); - assertEquals("org.apache.accumulo.core.iterators.CountingIterator", setting.getIteratorClass()); + assertEquals(CountingIterator.class.getName(), setting.getIteratorClass()); assertEquals("Count", setting.getName()); } @Test - public void testSetRegex() throws IOException { + public void testSetRegex() { String regex = ">\"*%<>\'\\"; IteratorSetting is = new IteratorSetting(50, regex, RegExFilter.class); RegExFilter.setRegexs(is, regex, null, null, null, false); - AccumuloInputFormat.addIterator(job, is); + AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(clientInfo).table("test") + .scanAuths(Authorizations.EMPTY).addIterator(is).build()); - assertEquals(regex, AccumuloInputFormat.getIterators(job).get(0).getName()); + assertEquals(regex, + InputConfigurator.getIterators(AccumuloInputFormat.class, job).get(0).getName()); } + @Test + public void testEmptyColumnFamily() throws IOException { + Set cols = new HashSet<>(); + cols.add(new IteratorSetting.Column(new Text(""), null)); + cols.add(new IteratorSetting.Column(new Text("foo"), new Text("bar"))); + cols.add(new IteratorSetting.Column(new Text(""), new Text("bar"))); + cols.add(new IteratorSetting.Column(new Text(""), new Text(""))); + cols.add(new IteratorSetting.Column(new Text("foo"), new Text(""))); + AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(clientInfo).table("test") + .scanAuths(Authorizations.EMPTY).fetchColumns(cols).build()); + + assertEquals(cols, InputConfigurator.getFetchedColumns(AccumuloInputFormat.class, job)); + } } diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloMultiTableInputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloMultiTableInputFormatTest.java deleted file mode 100644 index cc925c1..0000000 --- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloMultiTableInputFormatTest.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.hadoop.mapred; - -import static org.junit.Assert.assertEquals; - -import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.util.Pair; -import org.apache.accumulo.hadoop.mapreduce.InputTableConfig; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobConf; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; - -public class AccumuloMultiTableInputFormatTest { - - @Rule - public TestName testName = new TestName(); - - /** - * Verify {@link org.apache.accumulo.hadoop.mapreduce.InputTableConfig} objects get correctly - * serialized in the JobContext. - */ - @Test - public void testTableQueryConfigSerialization() throws IOException { - String table1Name = testName.getMethodName() + "1"; - String table2Name = testName.getMethodName() + "2"; - JobConf job = new JobConf(); - - InputTableConfig table1 = new InputTableConfig() - .setRanges(Collections.singletonList(new Range("a", "b"))) - .fetchColumns(Collections.singleton(new Pair<>(new Text("CF1"), new Text("CQ1")))) - .setIterators(Collections.singletonList(new IteratorSetting(50, "iter1", "iterclass1"))); - - InputTableConfig table2 = new InputTableConfig() - .setRanges(Collections.singletonList(new Range("a", "b"))) - .fetchColumns(Collections.singleton(new Pair<>(new Text("CF1"), new Text("CQ1")))) - .setIterators(Collections.singletonList(new IteratorSetting(50, "iter1", "iterclass1"))); - - Map configMap = new HashMap<>(); - configMap.put(table1Name, table1); - configMap.put(table2Name, table2); - AccumuloMultiTableInputFormat.setInputTableConfigs(job, configMap); - - assertEquals(table1, AccumuloMultiTableInputFormat.getInputTableConfig(job, table1Name)); - assertEquals(table2, AccumuloMultiTableInputFormat.getInputTableConfig(job, table2Name)); - } -} diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormatTest.java index 716f4e1..5811ebc 100644 --- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormatTest.java +++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormatTest.java @@ -16,21 +16,34 @@ */ package org.apache.accumulo.hadoop.mapred; +import static org.apache.accumulo.hadoopImpl.mapred.AccumuloOutputFormatImpl.getBatchWriterOptions; +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import java.io.IOException; +import java.util.Properties; import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.ClientInfo; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.hadoop.mapreduce.OutputInfo; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.mapred.JobConf; import org.junit.Test; public class AccumuloOutputFormatTest { - @Test public void testBWSettings() throws IOException { + ClientInfo clientInfo = createMock(ClientInfo.class); + AuthenticationToken token = createMock(AuthenticationToken.class); + Properties props = createMock(Properties.class); + expect(clientInfo.getAuthenticationToken()).andReturn(token).anyTimes(); + expect(clientInfo.getProperties()).andReturn(props).anyTimes(); + replay(clientInfo); JobConf job = new JobConf(); // make sure we aren't testing defaults @@ -45,7 +58,8 @@ public class AccumuloOutputFormatTest { bwConfig.setTimeout(9898989L, TimeUnit.MILLISECONDS); bwConfig.setMaxWriteThreads(42); bwConfig.setMaxMemory(1123581321L); - AccumuloOutputFormat.setBatchWriterOptions(job, bwConfig); + AccumuloOutputFormat.setInfo(job, + OutputInfo.builder().clientInfo(clientInfo).batchWriterOptions(bwConfig).build()); AccumuloOutputFormat myAOF = new AccumuloOutputFormat() { @Override diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloRowInputFormatIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloRowInputFormatIT.java new file mode 100644 index 0000000..5d001fb --- /dev/null +++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloRowInputFormatIT.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.hadoop.mapred; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.KeyValue; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.accumulo.core.util.PeekingIterator; +import org.apache.accumulo.hadoop.mapreduce.InputInfo; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.lib.NullOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Tests the new MR API in the hadoop-mareduce package. + * + * @since 2.0 + */ +public class AccumuloRowInputFormatIT extends AccumuloClusterHarness { + + private static final String ROW1 = "row1"; + private static final String ROW2 = "row2"; + private static final String ROW3 = "row3"; + private static final String COLF1 = "colf1"; + private static List> row1; + private static List> row2; + private static List> row3; + private static AssertionError e1 = null; + private static AssertionError e2 = null; + + @BeforeClass + public static void prepareRows() { + row1 = new ArrayList<>(); + row1.add(new KeyValue(new Key(ROW1, COLF1, "colq1"), "v1".getBytes())); + row1.add(new KeyValue(new Key(ROW1, COLF1, "colq2"), "v2".getBytes())); + row1.add(new KeyValue(new Key(ROW1, "colf2", "colq3"), "v3".getBytes())); + row2 = new ArrayList<>(); + row2.add(new KeyValue(new Key(ROW2, COLF1, "colq4"), "v4".getBytes())); + row3 = new ArrayList<>(); + row3.add(new KeyValue(new Key(ROW3, COLF1, "colq5"), "v5".getBytes())); + } + + private static void checkLists(final List> first, + final Iterator> second) { + int entryIndex = 0; + while (second.hasNext()) { + final Entry entry = second.next(); + assertEquals("Keys should be equal", first.get(entryIndex).getKey(), entry.getKey()); + assertEquals("Values should be equal", first.get(entryIndex).getValue(), entry.getValue()); + entryIndex++; + } + } + + private static void insertList(final BatchWriter writer, final List> list) + throws MutationsRejectedException { + for (Entry e : list) { + final Key key = e.getKey(); + final Mutation mutation = new Mutation(key.getRow()); + ColumnVisibility colVisibility = new ColumnVisibility(key.getColumnVisibility()); + mutation.put(key.getColumnFamily(), key.getColumnQualifier(), colVisibility, + key.getTimestamp(), e.getValue()); + writer.addMutation(mutation); + } + } + + private static class MRTester extends Configured implements Tool { + public static class TestMapper + implements Mapper>,Key,Value> { + int count = 0; + + @Override + public void map(Text k, PeekingIterator> v, + OutputCollector output, Reporter reporter) throws IOException { + try { + switch (count) { + case 0: + assertEquals("Current key should be " + ROW1, new Text(ROW1), k); + checkLists(row1, v); + break; + case 1: + assertEquals("Current key should be " + ROW2, new Text(ROW2), k); + checkLists(row2, v); + break; + case 2: + assertEquals("Current key should be " + ROW3, new Text(ROW3), k); + checkLists(row3, v); + break; + default: + fail(); + } + } catch (AssertionError e) { + e1 = e; + } + count++; + } + + @Override + public void configure(JobConf job) {} + + @Override + public void close() throws IOException { + try { + assertEquals(3, count); + } catch (AssertionError e) { + e2 = e; + } + } + + } + + @Override + public int run(String[] args) throws Exception { + + if (args.length != 1) { + throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " "); + } + + String table = args[0]; + + JobConf job = new JobConf(getConf()); + job.setJarByClass(this.getClass()); + + job.setInputFormat(AccumuloRowInputFormat.class); + + AccumuloRowInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo()) + .table(table).scanAuths(Authorizations.EMPTY).build()); + + job.setMapperClass(TestMapper.class); + job.setMapOutputKeyClass(Key.class); + job.setMapOutputValueClass(Value.class); + job.setOutputFormat(NullOutputFormat.class); + + job.setNumReduceTasks(0); + + return JobClient.runJob(job).isSuccessful() ? 0 : 1; + } + + public static void main(String[] args) throws Exception { + Configuration conf = new Configuration(); + conf.set("mapreduce.framework.name", "local"); + conf.set("mapreduce.cluster.local.dir", + new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath()); + assertEquals(0, ToolRunner.run(conf, new MRTester(), args)); + } + } + + @Test + public void test() throws Exception { + try (AccumuloClient client = getAccumuloClient()) { + String tableName = getUniqueNames(1)[0]; + client.tableOperations().create(tableName); + BatchWriter writer = null; + try { + writer = client.createBatchWriter(tableName, new BatchWriterConfig()); + insertList(writer, row1); + insertList(writer, row2); + insertList(writer, row3); + } finally { + if (writer != null) { + writer.close(); + } + } + MRTester.main(new String[] {tableName}); + assertNull(e1); + assertNull(e2); + } + } +} diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloFileOutputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloFileOutputFormatTest.java index 9eb3e38..ea74826 100644 --- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloFileOutputFormatTest.java +++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloFileOutputFormatTest.java @@ -32,13 +32,14 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; import org.apache.accumulo.hadoopImpl.mapreduce.lib.FileOutputConfigurator; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; import org.junit.Test; public class AccumuloFileOutputFormatTest { @Test - public void validateConfiguration() throws IOException, InterruptedException { + public void validateConfiguration() throws IOException { int a = 7; long b = 300L; @@ -55,13 +56,10 @@ public class AccumuloFileOutputFormatTest { .addOption(CountingSummarizer.MAX_COUNTERS_OPT, 256).build(); Job job1 = Job.getInstance(); - AccumuloFileOutputFormat.setReplication(job1, a); - AccumuloFileOutputFormat.setFileBlockSize(job1, b); - AccumuloFileOutputFormat.setDataBlockSize(job1, c); - AccumuloFileOutputFormat.setIndexBlockSize(job1, d); - AccumuloFileOutputFormat.setCompressionType(job1, e); - AccumuloFileOutputFormat.setSampler(job1, samplerConfig); - AccumuloFileOutputFormat.setSummarizers(job1, sc1, sc2); + AccumuloFileOutputFormat.setInfo(job1, + FileOutputInfo.builder().outputPath(new Path("somewhere")).replication(a).fileBlockSize(b) + .dataBlockSize(c).indexBlockSize(d).compressionType(e).sampler(samplerConfig) + .summarizers(sc1, sc2).build()); AccumuloConfiguration acuconf = FileOutputConfigurator .getAccumuloConfiguration(AccumuloFileOutputFormat.class, job1.getConfiguration()); @@ -90,12 +88,9 @@ public class AccumuloFileOutputFormatTest { samplerConfig.addOption("modulus", "100003"); Job job2 = Job.getInstance(); - AccumuloFileOutputFormat.setReplication(job2, a); - AccumuloFileOutputFormat.setFileBlockSize(job2, b); - AccumuloFileOutputFormat.setDataBlockSize(job2, c); - AccumuloFileOutputFormat.setIndexBlockSize(job2, d); - AccumuloFileOutputFormat.setCompressionType(job2, e); - AccumuloFileOutputFormat.setSampler(job2, samplerConfig); + AccumuloFileOutputFormat.setInfo(job2, + FileOutputInfo.builder().outputPath(new Path("somewhere")).replication(a).fileBlockSize(b) + .dataBlockSize(c).indexBlockSize(d).compressionType(e).sampler(samplerConfig).build()); acuconf = FileOutputConfigurator.getAccumuloConfiguration(AccumuloFileOutputFormat.class, job2.getConfiguration()); diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormatTest.java index 18fc46e..4de275b 100644 --- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormatTest.java +++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormatTest.java @@ -16,6 +16,9 @@ */ package org.apache.accumulo.hadoop.mapreduce; +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; import static org.junit.Assert.assertEquals; import java.io.ByteArrayOutputStream; @@ -24,18 +27,36 @@ import java.io.IOException; import java.util.Base64; import java.util.HashSet; import java.util.List; +import java.util.Properties; import java.util.Set; +import org.apache.accumulo.core.client.ClientInfo; import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.iterators.system.CountingIterator; import org.apache.accumulo.core.iterators.user.RegExFilter; +import org.apache.accumulo.core.iterators.user.VersioningIterator; import org.apache.accumulo.core.iterators.user.WholeRowIterator; -import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; +import org.junit.BeforeClass; import org.junit.Test; public class AccumuloInputFormatTest { + static ClientInfo clientInfo; + + @BeforeClass + public static void setupClientInfo() { + clientInfo = createMock(ClientInfo.class); + AuthenticationToken token = createMock(AuthenticationToken.class); + Properties props = createMock(Properties.class); + expect(clientInfo.getAuthenticationToken()).andReturn(token).anyTimes(); + expect(clientInfo.getProperties()).andReturn(props).anyTimes(); + replay(clientInfo); + } /** * Check that the iterator configuration is getting stored in the Job conf correctly. @@ -43,10 +64,11 @@ public class AccumuloInputFormatTest { @Test public void testSetIterator() throws IOException { Job job = Job.getInstance(); + InputInfo.InputInfoBuilder.InputFormatOptions opts = InputInfo.builder().clientInfo(clientInfo) + .table("test").scanAuths(Authorizations.EMPTY); - IteratorSetting is = new IteratorSetting(1, "WholeRow", - "org.apache.accumulo.core.iterators.WholeRowIterator"); - AccumuloInputFormat.addIterator(job, is); + IteratorSetting is = new IteratorSetting(1, "WholeRow", WholeRowIterator.class); + AccumuloInputFormat.setInfo(job, opts.addIterator(is).build()); Configuration conf = job.getConfiguration(); ByteArrayOutputStream baos = new ByteArrayOutputStream(); is.write(new DataOutputStream(baos)); @@ -57,18 +79,19 @@ public class AccumuloInputFormatTest { @Test public void testAddIterator() throws IOException { Job job = Job.getInstance(); + InputInfo.InputInfoBuilder.InputFormatOptions opts = InputInfo.builder().clientInfo(clientInfo) + .table("test").scanAuths(Authorizations.EMPTY); - AccumuloInputFormat.addIterator(job, - new IteratorSetting(1, "WholeRow", WholeRowIterator.class)); - AccumuloInputFormat.addIterator(job, new IteratorSetting(2, "Versions", - "org.apache.accumulo.core.iterators.VersioningIterator")); - IteratorSetting iter = new IteratorSetting(3, "Count", - "org.apache.accumulo.core.iterators.CountingIterator"); - iter.addOption("v1", "1"); - iter.addOption("junk", "\0omg:!\\xyzzy"); - AccumuloInputFormat.addIterator(job, iter); + IteratorSetting iter1 = new IteratorSetting(1, "WholeRow", WholeRowIterator.class); + IteratorSetting iter2 = new IteratorSetting(2, "Versions", VersioningIterator.class); + IteratorSetting iter3 = new IteratorSetting(3, "Count", CountingIterator.class); + iter3.addOption("v1", "1"); + iter3.addOption("junk", "\0omg:!\\xyzzy"); + AccumuloInputFormat.setInfo(job, + opts.addIterator(iter1).addIterator(iter2).addIterator(iter3).build()); - List list = AccumuloInputFormat.getIterators(job); + List list = InputConfigurator.getIterators(AccumuloInputFormat.class, + job.getConfiguration()); // Check the list size assertEquals(3, list.size()); @@ -76,21 +99,19 @@ public class AccumuloInputFormatTest { // Walk the list and make sure our settings are correct IteratorSetting setting = list.get(0); assertEquals(1, setting.getPriority()); - assertEquals("org.apache.accumulo.core.iterators.user.WholeRowIterator", - setting.getIteratorClass()); + assertEquals(WholeRowIterator.class.getName(), setting.getIteratorClass()); assertEquals("WholeRow", setting.getName()); assertEquals(0, setting.getOptions().size()); setting = list.get(1); assertEquals(2, setting.getPriority()); - assertEquals("org.apache.accumulo.core.iterators.VersioningIterator", - setting.getIteratorClass()); + assertEquals(VersioningIterator.class.getName(), setting.getIteratorClass()); assertEquals("Versions", setting.getName()); assertEquals(0, setting.getOptions().size()); setting = list.get(2); assertEquals(3, setting.getPriority()); - assertEquals("org.apache.accumulo.core.iterators.CountingIterator", setting.getIteratorClass()); + assertEquals(CountingIterator.class.getName(), setting.getIteratorClass()); assertEquals("Count", setting.getName()); assertEquals(2, setting.getOptions().size()); assertEquals("1", setting.getOptions().get("v1")); @@ -109,21 +130,25 @@ public class AccumuloInputFormatTest { public void testIteratorOptionEncoding() throws Throwable { String key = "colon:delimited:key"; String value = "comma,delimited,value"; - IteratorSetting someSetting = new IteratorSetting(1, "iterator", "Iterator.class"); - someSetting.addOption(key, value); + IteratorSetting iter1 = new IteratorSetting(1, "iter1", WholeRowIterator.class); + iter1.addOption(key, value); Job job = Job.getInstance(); - AccumuloInputFormat.addIterator(job, someSetting); + // also test if reusing options will create duplicate iterators + InputInfo.InputInfoBuilder.InputFormatOptions opts = InputInfo.builder().clientInfo(clientInfo) + .table("test").scanAuths(Authorizations.EMPTY); + AccumuloInputFormat.setInfo(job, opts.addIterator(iter1).build()); - List list = AccumuloInputFormat.getIterators(job); + List list = InputConfigurator.getIterators(AccumuloInputFormat.class, + job.getConfiguration()); assertEquals(1, list.size()); assertEquals(1, list.get(0).getOptions().size()); assertEquals(list.get(0).getOptions().get(key), value); - someSetting.addOption(key + "2", value); - someSetting.setPriority(2); - someSetting.setName("it2"); - AccumuloInputFormat.addIterator(job, someSetting); - list = AccumuloInputFormat.getIterators(job); + IteratorSetting iter2 = new IteratorSetting(1, "iter2", WholeRowIterator.class); + iter2.addOption(key, value); + iter2.addOption(key + "2", value); + AccumuloInputFormat.setInfo(job, opts.addIterator(iter1).addIterator(iter2).build()); + list = InputConfigurator.getIterators(AccumuloInputFormat.class, job.getConfiguration()); assertEquals(2, list.size()); assertEquals(1, list.get(0).getOptions().size()); assertEquals(list.get(0).getOptions().get(key), value); @@ -139,14 +164,15 @@ public class AccumuloInputFormatTest { public void testGetIteratorSettings() throws IOException { Job job = Job.getInstance(); - AccumuloInputFormat.addIterator(job, - new IteratorSetting(1, "WholeRow", "org.apache.accumulo.core.iterators.WholeRowIterator")); - AccumuloInputFormat.addIterator(job, new IteratorSetting(2, "Versions", - "org.apache.accumulo.core.iterators.VersioningIterator")); - AccumuloInputFormat.addIterator(job, - new IteratorSetting(3, "Count", "org.apache.accumulo.core.iterators.CountingIterator")); + IteratorSetting iter1 = new IteratorSetting(1, "WholeRow", WholeRowIterator.class.getName()); + IteratorSetting iter2 = new IteratorSetting(2, "Versions", VersioningIterator.class.getName()); + IteratorSetting iter3 = new IteratorSetting(3, "Count", CountingIterator.class.getName()); + AccumuloInputFormat.setInfo(job, + InputInfo.builder().clientInfo(clientInfo).table("test").scanAuths(Authorizations.EMPTY) + .addIterator(iter1).addIterator(iter2).addIterator(iter3).build()); - List list = AccumuloInputFormat.getIterators(job); + List list = InputConfigurator.getIterators(AccumuloInputFormat.class, + job.getConfiguration()); // Check the list size assertEquals(3, list.size()); @@ -154,18 +180,17 @@ public class AccumuloInputFormatTest { // Walk the list and make sure our settings are correct IteratorSetting setting = list.get(0); assertEquals(1, setting.getPriority()); - assertEquals("org.apache.accumulo.core.iterators.WholeRowIterator", setting.getIteratorClass()); + assertEquals(WholeRowIterator.class.getName(), setting.getIteratorClass()); assertEquals("WholeRow", setting.getName()); setting = list.get(1); assertEquals(2, setting.getPriority()); - assertEquals("org.apache.accumulo.core.iterators.VersioningIterator", - setting.getIteratorClass()); + assertEquals(VersioningIterator.class.getName(), setting.getIteratorClass()); assertEquals("Versions", setting.getName()); setting = list.get(2); assertEquals(3, setting.getPriority()); - assertEquals("org.apache.accumulo.core.iterators.CountingIterator", setting.getIteratorClass()); + assertEquals(CountingIterator.class.getName(), setting.getIteratorClass()); assertEquals("Count", setting.getName()); } @@ -178,22 +203,26 @@ public class AccumuloInputFormatTest { IteratorSetting is = new IteratorSetting(50, regex, RegExFilter.class); RegExFilter.setRegexs(is, regex, null, null, null, false); - AccumuloInputFormat.addIterator(job, is); + AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(clientInfo).table("test") + .scanAuths(Authorizations.EMPTY).addIterator(is).build()); - assertEquals(regex, AccumuloInputFormat.getIterators(job).get(0).getName()); + assertEquals(regex, InputConfigurator + .getIterators(AccumuloInputFormat.class, job.getConfiguration()).get(0).getName()); } @Test public void testEmptyColumnFamily() throws IOException { Job job = Job.getInstance(); - Set> cols = new HashSet<>(); - cols.add(new Pair<>(new Text(""), null)); - cols.add(new Pair<>(new Text("foo"), new Text("bar"))); - cols.add(new Pair<>(new Text(""), new Text("bar"))); - cols.add(new Pair<>(new Text(""), new Text(""))); - cols.add(new Pair<>(new Text("foo"), new Text(""))); - AccumuloInputFormat.fetchColumns(job, cols); - Set> setCols = AccumuloInputFormat.getFetchedColumns(job); - assertEquals(cols, setCols); + Set cols = new HashSet<>(); + cols.add(new IteratorSetting.Column(new Text(""), null)); + cols.add(new IteratorSetting.Column(new Text("foo"), new Text("bar"))); + cols.add(new IteratorSetting.Column(new Text(""), new Text("bar"))); + cols.add(new IteratorSetting.Column(new Text(""), new Text(""))); + cols.add(new IteratorSetting.Column(new Text("foo"), new Text(""))); + AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(clientInfo).table("test") + .scanAuths(Authorizations.EMPTY).fetchColumns(cols).build()); + + assertEquals(cols, + InputConfigurator.getFetchedColumns(AccumuloInputFormat.class, job.getConfiguration())); } } diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloMultiTableInputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloMultiTableInputFormatTest.java deleted file mode 100644 index 18b1b6b..0000000 --- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloMultiTableInputFormatTest.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.hadoop.mapreduce; - -import static org.junit.Assert.assertEquals; - -import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.util.Pair; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; - -public class AccumuloMultiTableInputFormatTest { - - @Rule - public TestName testName = new TestName(); - - /** - * Verify {@link InputTableConfig} objects get correctly serialized in the JobContext. - */ - @Test - public void testInputTableConfigSerialization() throws IOException { - String table1 = testName.getMethodName() + "1"; - String table2 = testName.getMethodName() + "2"; - Job job = Job.getInstance(); - - InputTableConfig tableConfig = new InputTableConfig() - .setRanges(Collections.singletonList(new Range("a", "b"))) - .fetchColumns(Collections.singleton(new Pair<>(new Text("CF1"), new Text("CQ1")))) - .setIterators(Collections.singletonList(new IteratorSetting(50, "iter1", "iterclass1"))); - - Map configMap = new HashMap<>(); - configMap.put(table1, tableConfig); - configMap.put(table2, tableConfig); - - AccumuloMultiTableInputFormat.setInputTableConfigs(job, configMap); - - assertEquals(tableConfig, AccumuloMultiTableInputFormat.getInputTableConfig(job, table1)); - assertEquals(tableConfig, AccumuloMultiTableInputFormat.getInputTableConfig(job, table2)); - } - -} diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormatTest.java index 687d276..e698b3a 100644 --- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormatTest.java +++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormatTest.java @@ -16,13 +16,20 @@ */ package org.apache.accumulo.hadoop.mapreduce; +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import java.io.IOException; +import java.util.Properties; import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.ClientInfo; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.hadoopImpl.mapreduce.AccumuloOutputFormatImpl; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.junit.Test; @@ -31,6 +38,12 @@ public class AccumuloOutputFormatTest { @Test public void testBWSettings() throws IOException { + ClientInfo clientInfo = createMock(ClientInfo.class); + AuthenticationToken token = createMock(AuthenticationToken.class); + Properties props = createMock(Properties.class); + expect(clientInfo.getAuthenticationToken()).andReturn(token).anyTimes(); + expect(clientInfo.getProperties()).andReturn(props).anyTimes(); + replay(clientInfo); Job job = Job.getInstance(); // make sure we aren't testing defaults @@ -45,12 +58,13 @@ public class AccumuloOutputFormatTest { bwConfig.setTimeout(9898989L, TimeUnit.MILLISECONDS); bwConfig.setMaxWriteThreads(42); bwConfig.setMaxMemory(1123581321L); - AccumuloOutputFormat.setBatchWriterOptions(job, bwConfig); + AccumuloOutputFormat.setInfo(job, + OutputInfo.builder().clientInfo(clientInfo).batchWriterOptions(bwConfig).build()); AccumuloOutputFormat myAOF = new AccumuloOutputFormat() { @Override public void checkOutputSpecs(JobContext job) throws IOException { - BatchWriterConfig bwOpts = getBatchWriterOptions(job); + BatchWriterConfig bwOpts = AccumuloOutputFormatImpl.getBatchWriterOptions(job); // passive check assertEquals(bwConfig.getMaxLatency(TimeUnit.MILLISECONDS), diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloRowInputFormatIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloRowInputFormatIT.java new file mode 100644 index 0000000..0d5f20b --- /dev/null +++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloRowInputFormatIT.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.hadoop.mapreduce; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.KeyValue; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.accumulo.core.util.PeekingIterator; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Tests the new MR API in the hadoop-mareduce package. + * + * @since 2.0 + */ +public class AccumuloRowInputFormatIT extends AccumuloClusterHarness { + + private static final String ROW1 = "row1"; + private static final String ROW2 = "row2"; + private static final String ROW3 = "row3"; + private static final String COLF1 = "colf1"; + private static List> row1; + private static List> row2; + private static List> row3; + private static AssertionError e1 = null; + private static AssertionError e2 = null; + + @BeforeClass + public static void prepareRows() { + row1 = new ArrayList<>(); + row1.add(new KeyValue(new Key(ROW1, COLF1, "colq1"), "v1".getBytes())); + row1.add(new KeyValue(new Key(ROW1, COLF1, "colq2"), "v2".getBytes())); + row1.add(new KeyValue(new Key(ROW1, "colf2", "colq3"), "v3".getBytes())); + row2 = new ArrayList<>(); + row2.add(new KeyValue(new Key(ROW2, COLF1, "colq4"), "v4".getBytes())); + row3 = new ArrayList<>(); + row3.add(new KeyValue(new Key(ROW3, COLF1, "colq5"), "v5".getBytes())); + } + + private static void checkLists(final List> first, + final Iterator> second) { + int entryIndex = 0; + while (second.hasNext()) { + final Entry entry = second.next(); + assertEquals("Keys should be equal", first.get(entryIndex).getKey(), entry.getKey()); + assertEquals("Values should be equal", first.get(entryIndex).getValue(), entry.getValue()); + entryIndex++; + } + } + + private static void insertList(final BatchWriter writer, final List> list) + throws MutationsRejectedException { + for (Entry e : list) { + final Key key = e.getKey(); + final Mutation mutation = new Mutation(key.getRow()); + ColumnVisibility colVisibility = new ColumnVisibility(key.getColumnVisibility()); + mutation.put(key.getColumnFamily(), key.getColumnQualifier(), colVisibility, + key.getTimestamp(), e.getValue()); + writer.addMutation(mutation); + } + } + + private static class MRTester extends Configured implements Tool { + private static class TestMapper + extends Mapper>,Key,Value> { + int count = 0; + + @Override + protected void map(Text k, PeekingIterator> v, Context context) + throws IOException, InterruptedException { + try { + switch (count) { + case 0: + assertEquals("Current key should be " + ROW1, new Text(ROW1), k); + checkLists(row1, v); + break; + case 1: + assertEquals("Current key should be " + ROW2, new Text(ROW2), k); + checkLists(row2, v); + break; + case 2: + assertEquals("Current key should be " + ROW3, new Text(ROW3), k); + checkLists(row3, v); + break; + default: + fail(); + } + } catch (AssertionError e) { + e1 = e; + } + count++; + } + + @Override + protected void cleanup(Context context) throws IOException, InterruptedException { + try { + assertEquals(3, count); + } catch (AssertionError e) { + e2 = e; + } + } + } + + @Override + public int run(String[] args) throws Exception { + + if (args.length != 1) { + throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + "
    "); + } + + String table = args[0]; + + Job job = Job.getInstance(getConf(), + this.getClass().getSimpleName() + "_" + System.currentTimeMillis()); + job.setJarByClass(this.getClass()); + + job.setInputFormatClass(AccumuloRowInputFormat.class); + + AccumuloRowInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo()) + .table(table).scanAuths(Authorizations.EMPTY).build()); + + job.setMapperClass(TestMapper.class); + job.setMapOutputKeyClass(Key.class); + job.setMapOutputValueClass(Value.class); + job.setOutputFormatClass(NullOutputFormat.class); + + job.setNumReduceTasks(0); + + job.waitForCompletion(true); + + return job.isSuccessful() ? 0 : 1; + } + + public static void main(String[] args) throws Exception { + Configuration conf = new Configuration(); + conf.set("mapreduce.framework.name", "local"); + conf.set("mapreduce.cluster.local.dir", + new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath()); + assertEquals(0, ToolRunner.run(conf, new MRTester(), args)); + } + } + + @Test + public void test() throws Exception { + try (AccumuloClient client = getAccumuloClient()) { + String tableName = getUniqueNames(1)[0]; + client.tableOperations().create(tableName); + BatchWriter writer = null; + try { + writer = client.createBatchWriter(tableName, new BatchWriterConfig()); + insertList(writer, row1); + insertList(writer, row2); + insertList(writer, row3); + } finally { + if (writer != null) { + writer.close(); + } + } + MRTester.main(new String[] {tableName}); + assertNull(e1); + assertNull(e2); + } + } +} diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/NewAccumuloInputFormatIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/NewAccumuloInputFormatIT.java new file mode 100644 index 0000000..8e2f89e --- /dev/null +++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/NewAccumuloInputFormatIT.java @@ -0,0 +1,472 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.hadoop.mapreduce; + +import static java.lang.System.currentTimeMillis; +import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.client.sample.RowSampler; +import org.apache.accumulo.core.client.sample.SamplerConfiguration; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.hadoopImpl.mapreduce.BatchInputSplit; +import org.apache.accumulo.hadoopImpl.mapreduce.RangeInputSplit; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; + +/** + * Tests the new MR API in the hadoop-mareduce package. + * + * @since 2.0 + */ +public class NewAccumuloInputFormatIT extends AccumuloClusterHarness { + + AccumuloInputFormat inputFormat; + + @Override + protected int defaultTimeoutSeconds() { + return 4 * 60; + } + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setNumTservers(1); + } + + @Before + public void before() { + inputFormat = new AccumuloInputFormat(); + } + + /** + * Tests several different paths through the getSplits() method by setting different properties + * and verifying the results. + */ + @Test + public void testGetSplits() throws Exception { + AccumuloClient client = getAccumuloClient(); + String table = getUniqueNames(1)[0]; + client.tableOperations().create(table); + insertData(table, currentTimeMillis()); + + Job job = Job.getInstance(); + AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo()).table(table) + .scanAuths(Authorizations.EMPTY).scanIsolation().build()); + + // split table + TreeSet splitsToAdd = new TreeSet<>(); + for (int i = 0; i < 10000; i += 1000) + splitsToAdd.add(new Text(String.format("%09d", i))); + client.tableOperations().addSplits(table, splitsToAdd); + sleepUninterruptibly(500, TimeUnit.MILLISECONDS); // wait for splits to be propagated + + // get splits without setting any range + // No ranges set on the job so it'll start with -inf + Collection actualSplits = client.tableOperations().listSplits(table); + List splits = inputFormat.getSplits(job); + assertEquals(actualSplits.size() + 1, splits.size()); + + // set ranges and get splits + List ranges = new ArrayList<>(); + for (Text text : actualSplits) + ranges.add(new Range(text)); + AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo()).table(table) + .scanAuths(Authorizations.EMPTY).ranges(ranges).build()); + splits = inputFormat.getSplits(job); + assertEquals(actualSplits.size(), splits.size()); + + // offline mode + AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo()).table(table) + .scanAuths(Authorizations.EMPTY).offlineScan().build()); + try { + inputFormat.getSplits(job); + fail("An exception should have been thrown"); + } catch (IOException e) {} + + client.tableOperations().offline(table, true); + splits = inputFormat.getSplits(job); + assertEquals(actualSplits.size(), splits.size()); + + // auto adjust ranges + ranges = new ArrayList<>(); + for (int i = 0; i < 5; i++) + // overlapping ranges + ranges.add(new Range(String.format("%09d", i), String.format("%09d", i + 2))); + AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo()).table(table) + .scanAuths(Authorizations.EMPTY).ranges(ranges).offlineScan().build()); + splits = inputFormat.getSplits(job); + assertEquals(2, splits.size()); + + AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo()).table(table) + .scanAuths(Authorizations.EMPTY).disableAutoAdjustRanges().offlineScan().build()); + splits = inputFormat.getSplits(job); + assertEquals(ranges.size(), splits.size()); + + // BatchScan not available for offline scans + AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo()).table(table) + .scanAuths(Authorizations.EMPTY).batchScan().build()); + try { + inputFormat.getSplits(job); + fail("An exception should have been thrown"); + } catch (IOException e) {} + + // table online tests + client.tableOperations().online(table, true); + AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo()).table(table) + .scanAuths(Authorizations.EMPTY).build()); + // test for resumption of success + splits = inputFormat.getSplits(job); + assertEquals(2, splits.size()); + + // BatchScan not available with isolated iterators + AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo()).table(table) + .scanAuths(Authorizations.EMPTY).scanIsolation().build()); + + splits = inputFormat.getSplits(job); + assertEquals(2, splits.size()); + + // BatchScan not available with local iterators + AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo()).table(table) + .scanAuths(Authorizations.EMPTY).localIterators().build()); + + splits = inputFormat.getSplits(job); + assertEquals(2, splits.size()); + + AccumuloInputFormat.setInfo(job, InputInfo.builder().clientInfo(getClientInfo()).table(table) + .scanAuths(Authorizations.EMPTY).batchScan().build()); + + // Check we are getting back correct type pf split + splits = inputFormat.getSplits(job); + for (InputSplit split : splits) + assert (split instanceof BatchInputSplit); + + // We should divide along the tablet lines similar to when using `setAutoAdjustRanges(job, + // true)` + assertEquals(2, splits.size()); + } + + private void insertData(String tableName, long ts) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + BatchWriter bw = getAccumuloClient().createBatchWriter(tableName, null); + + for (int i = 0; i < 10000; i++) { + String row = String.format("%09d", i); + + Mutation m = new Mutation(new Text(row)); + m.put(new Text("cf1"), new Text("cq1"), ts, new Value(("" + i).getBytes())); + bw.addMutation(m); + } + bw.close(); + } + + // track errors in the map reduce job; jobs insert a dummy error for the map and cleanup tasks (to + // ensure test correctness), + // so error tests should check to see if there is at least one error (could be more depending on + // the test) rather than zero + private static Multimap assertionErrors = ArrayListMultimap.create(); + + private static class MRTester extends Configured implements Tool { + private static class TestMapper extends Mapper { + Key key = null; + int count = 0; + + @Override + protected void map(Key k, Value v, Context context) throws IOException, InterruptedException { + String table = context.getConfiguration().get("MRTester_tableName"); + assertNotNull(table); + try { + if (key != null) + assertEquals(key.getRow().toString(), new String(v.get())); + assertEquals(k.getRow(), new Text(String.format("%09x", count + 1))); + assertEquals(new String(v.get()), String.format("%09x", count)); + } catch (AssertionError e) { + assertionErrors.put(table + "_map", e); + } + key = new Key(k); + count++; + } + + @Override + protected void cleanup(Context context) throws IOException, InterruptedException { + String table = context.getConfiguration().get("MRTester_tableName"); + assertNotNull(table); + try { + assertEquals(100, count); + } catch (AssertionError e) { + assertionErrors.put(table + "_cleanup", e); + } + } + } + + @Override + public int run(String[] args) throws Exception { + + if (args.length != 2 && args.length != 4) { + throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + + "
    [ ]"); + } + + String table = args[0]; + String inputFormatClassName = args[1]; + Boolean batchScan = false; + boolean sample = false; + if (args.length == 4) { + batchScan = Boolean.parseBoolean(args[2]); + sample = Boolean.parseBoolean(args[3]); + } + + assertionErrors.put(table + "_map", new AssertionError("Dummy_map")); + assertionErrors.put(table + "_cleanup", new AssertionError("Dummy_cleanup")); + + @SuppressWarnings("unchecked") + Class> inputFormatClass = (Class>) Class + .forName(inputFormatClassName); + + Job job = Job.getInstance(getConf(), + this.getClass().getSimpleName() + "_" + System.currentTimeMillis()); + job.setJarByClass(this.getClass()); + job.getConfiguration().set("MRTester_tableName", table); + + job.setInputFormatClass(inputFormatClass); + + InputInfo.InputInfoBuilder.InputFormatOptions opts = InputInfo.builder() + .clientInfo(getClientInfo()).table(table).scanAuths(Authorizations.EMPTY); + if (sample) + opts = opts.samplerConfiguration(SAMPLER_CONFIG); + if (batchScan) + AccumuloInputFormat.setInfo(job, opts.batchScan().build()); + else + AccumuloInputFormat.setInfo(job, opts.build()); + + job.setMapperClass(TestMapper.class); + job.setMapOutputKeyClass(Key.class); + job.setMapOutputValueClass(Value.class); + job.setOutputFormatClass(NullOutputFormat.class); + + job.setNumReduceTasks(0); + + job.waitForCompletion(true); + + return job.isSuccessful() ? 0 : 1; + } + + public static int main(String[] args) throws Exception { + Configuration conf = new Configuration(); + conf.set("mapreduce.framework.name", "local"); + conf.set("mapreduce.cluster.local.dir", + new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath()); + return ToolRunner.run(conf, new MRTester(), args); + } + } + + @Test + public void testMap() throws Exception { + final String TEST_TABLE_1 = getUniqueNames(1)[0]; + + AccumuloClient c = getAccumuloClient(); + c.tableOperations().create(TEST_TABLE_1); + BatchWriter bw = c.createBatchWriter(TEST_TABLE_1, new BatchWriterConfig()); + for (int i = 0; i < 100; i++) { + Mutation m = new Mutation(new Text(String.format("%09x", i + 1))); + m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes())); + bw.addMutation(m); + } + bw.close(); + + assertEquals(0, + MRTester.main(new String[] {TEST_TABLE_1, AccumuloInputFormat.class.getName()})); + assertEquals(1, assertionErrors.get(TEST_TABLE_1 + "_map").size()); + assertEquals(1, assertionErrors.get(TEST_TABLE_1 + "_cleanup").size()); + } + + private static final SamplerConfiguration SAMPLER_CONFIG = new SamplerConfiguration( + RowSampler.class.getName()).addOption("hasher", "murmur3_32").addOption("modulus", "3"); + + @Test + public void testSample() throws Exception { + final String TEST_TABLE_3 = getUniqueNames(1)[0]; + + AccumuloClient c = getAccumuloClient(); + c.tableOperations().create(TEST_TABLE_3, + new NewTableConfiguration().enableSampling(SAMPLER_CONFIG)); + BatchWriter bw = c.createBatchWriter(TEST_TABLE_3, new BatchWriterConfig()); + for (int i = 0; i < 100; i++) { + Mutation m = new Mutation(new Text(String.format("%09x", i + 1))); + m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes())); + bw.addMutation(m); + } + bw.close(); + + assertEquals(0, MRTester + .main(new String[] {TEST_TABLE_3, AccumuloInputFormat.class.getName(), "False", "True"})); + assertEquals(39, assertionErrors.get(TEST_TABLE_3 + "_map").size()); + assertEquals(2, assertionErrors.get(TEST_TABLE_3 + "_cleanup").size()); + + assertionErrors.clear(); + assertEquals(0, MRTester + .main(new String[] {TEST_TABLE_3, AccumuloInputFormat.class.getName(), "False", "False"})); + assertEquals(1, assertionErrors.get(TEST_TABLE_3 + "_map").size()); + assertEquals(1, assertionErrors.get(TEST_TABLE_3 + "_cleanup").size()); + + assertionErrors.clear(); + assertEquals(0, MRTester + .main(new String[] {TEST_TABLE_3, AccumuloInputFormat.class.getName(), "True", "True"})); + assertEquals(39, assertionErrors.get(TEST_TABLE_3 + "_map").size()); + assertEquals(2, assertionErrors.get(TEST_TABLE_3 + "_cleanup").size()); + } + + @Test + public void testMapWithBatchScanner() throws Exception { + final String TEST_TABLE_2 = getUniqueNames(1)[0]; + + AccumuloClient c = getAccumuloClient(); + c.tableOperations().create(TEST_TABLE_2); + BatchWriter bw = c.createBatchWriter(TEST_TABLE_2, new BatchWriterConfig()); + for (int i = 0; i < 100; i++) { + Mutation m = new Mutation(new Text(String.format("%09x", i + 1))); + m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes())); + bw.addMutation(m); + } + bw.close(); + + assertEquals(0, MRTester + .main(new String[] {TEST_TABLE_2, AccumuloInputFormat.class.getName(), "True", "False"})); + assertEquals(1, assertionErrors.get(TEST_TABLE_2 + "_map").size()); + assertEquals(1, assertionErrors.get(TEST_TABLE_2 + "_cleanup").size()); + } + + @Test + public void testCorrectRangeInputSplits() throws Exception { + Job job = Job.getInstance(); + + String table = getUniqueNames(1)[0]; + Authorizations auths = new Authorizations("foo"); + Collection fetchColumns = Collections + .singleton(new IteratorSetting.Column(new Text("foo"), new Text("bar"))); + Collection> fetchColumnsText = Collections + .singleton(new Pair<>(new Text("foo"), new Text("bar"))); + boolean isolated = true, localIters = true; + + AccumuloClient accumuloClient = getAccumuloClient(); + accumuloClient.tableOperations().create(table); + + InputInfo.InputInfoBuilder.InputFormatOptions opts = InputInfo.builder() + .clientInfo(getClientInfo()).table(table).scanAuths(auths); + AccumuloInputFormat.setInfo(job, + opts.fetchColumns(fetchColumns).scanIsolation().localIterators().build()); + + AccumuloInputFormat aif = new AccumuloInputFormat(); + + List splits = aif.getSplits(job); + + assertEquals(1, splits.size()); + + InputSplit split = splits.get(0); + + assertEquals(RangeInputSplit.class, split.getClass()); + + RangeInputSplit risplit = (RangeInputSplit) split; + + assertEquals(table, risplit.getTableName()); + assertEquals(isolated, risplit.isIsolatedScan()); + assertEquals(localIters, risplit.usesLocalIterators()); + assertEquals(fetchColumnsText, risplit.getFetchedColumns()); + } + + @Test + public void testPartialInputSplitDelegationToConfiguration() throws Exception { + String table = getUniqueNames(1)[0]; + AccumuloClient c = getAccumuloClient(); + c.tableOperations().create(table); + BatchWriter bw = c.createBatchWriter(table, new BatchWriterConfig()); + for (int i = 0; i < 100; i++) { + Mutation m = new Mutation(new Text(String.format("%09x", i + 1))); + m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes())); + bw.addMutation(m); + } + bw.close(); + + assertEquals(0, + MRTester.main(new String[] {table, EmptySplitsAccumuloInputFormat.class.getName()})); + assertEquals(1, assertionErrors.get(table + "_map").size()); + assertEquals(1, assertionErrors.get(table + "_cleanup").size()); + } + + /** + * AccumuloInputFormat which returns an "empty" RangeInputSplit + */ + public static class EmptySplitsAccumuloInputFormat extends AccumuloInputFormat { + + @Override + public List getSplits(JobContext context) throws IOException { + List oldSplits = super.getSplits(context); + List newSplits = new ArrayList<>(oldSplits.size()); + + // Copy only the necessary information + for (InputSplit oldSplit : oldSplits) { + // @formatter:off + RangeInputSplit newSplit = + new RangeInputSplit( + (RangeInputSplit) oldSplit); + // @formatter:on + newSplits.add(newSplit); + } + + return newSplits; + } + } +} diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/RangeInputSplitTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoopImpl/mapred/RangeInputSplitTest.java similarity index 90% rename from hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/RangeInputSplitTest.java rename to hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoopImpl/mapred/RangeInputSplitTest.java index e7988d9..266c8a3 100644 --- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/RangeInputSplitTest.java +++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoopImpl/mapred/RangeInputSplitTest.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.accumulo.hadoop.mapred; +package org.apache.accumulo.hadoopImpl.mapred; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -35,9 +35,7 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.iterators.user.SummingCombiner; import org.apache.accumulo.core.iterators.user.WholeRowIterator; -import org.apache.accumulo.core.util.Pair; import org.apache.hadoop.io.Text; -import org.apache.log4j.Level; import org.junit.Test; import com.google.common.collect.ImmutableMap; @@ -68,10 +66,10 @@ public class RangeInputSplitTest { RangeInputSplit split = new RangeInputSplit("table", "1", new Range(new Key("a"), new Key("b")), new String[] {"localhost"}); - Set> fetchedColumns = new HashSet<>(); + Set fetchedColumns = new HashSet<>(); - fetchedColumns.add(new Pair<>(new Text("colf1"), new Text("colq1"))); - fetchedColumns.add(new Pair<>(new Text("colf2"), new Text("colq2"))); + fetchedColumns.add(new IteratorSetting.Column(new Text("colf1"), new Text("colq1"))); + fetchedColumns.add(new IteratorSetting.Column(new Text("colf2"), new Text("colq2"))); // Fake some iterators ArrayList iterators = new ArrayList<>(); @@ -88,7 +86,6 @@ public class RangeInputSplitTest { split.setUsesLocalIterators(true); split.setFetchedColumns(fetchedColumns); split.setIterators(iterators); - split.setLogLevel(Level.WARN); split.setExecutionHints(ImmutableMap.of("priority", "9")); ByteArrayOutputStream baos = new ByteArrayOutputStream(); @@ -109,7 +106,6 @@ public class RangeInputSplitTest { assertEquals(split.usesLocalIterators(), newSplit.usesLocalIterators()); assertEquals(split.getFetchedColumns(), newSplit.getFetchedColumns()); assertEquals(split.getIterators(), newSplit.getIterators()); - assertEquals(split.getLogLevel(), newSplit.getLogLevel()); assertEquals(split.getExecutionHints(), newSplit.getExecutionHints()); } diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoopImpl/mapreduce/BatchInputSplitTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoopImpl/mapreduce/BatchInputSplitTest.java index 27509aa..7c3d6e3 100644 --- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoopImpl/mapreduce/BatchInputSplitTest.java +++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoopImpl/mapreduce/BatchInputSplitTest.java @@ -38,9 +38,7 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.iterators.user.SummingCombiner; import org.apache.accumulo.core.iterators.user.WholeRowIterator; -import org.apache.accumulo.core.util.Pair; import org.apache.hadoop.io.Text; -import org.apache.log4j.Level; import org.junit.Test; public class BatchInputSplitTest { @@ -73,10 +71,10 @@ public class BatchInputSplitTest { BatchInputSplit split = new BatchInputSplit("table", Table.ID.of("1"), ranges, new String[] {"localhost"}); - Set> fetchedColumns = new HashSet<>(); + Set fetchedColumns = new HashSet<>(); - fetchedColumns.add(new Pair<>(new Text("colf1"), new Text("colq1"))); - fetchedColumns.add(new Pair<>(new Text("colf2"), new Text("colq2"))); + fetchedColumns.add(new IteratorSetting.Column(new Text("colf1"), new Text("colq1"))); + fetchedColumns.add(new IteratorSetting.Column(new Text("colf2"), new Text("colq2"))); // Fake some iterators ArrayList iterators = new ArrayList<>(); @@ -91,7 +89,6 @@ public class BatchInputSplitTest { split.setTableName("table"); split.setFetchedColumns(fetchedColumns); split.setIterators(iterators); - split.setLogLevel(Level.WARN); ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(baos); @@ -109,6 +106,5 @@ public class BatchInputSplitTest { assertEquals(split.getTableName(), newSplit.getTableName()); assertEquals(split.getFetchedColumns(), newSplit.getFetchedColumns()); assertEquals(split.getIterators(), newSplit.getIterators()); - assertEquals(split.getLogLevel(), newSplit.getLogLevel()); } } diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/InputTableConfigTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoopImpl/mapreduce/InputTableConfigTest.java similarity index 94% rename from hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/InputTableConfigTest.java rename to hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoopImpl/mapreduce/InputTableConfigTest.java index 847981f..0d25fee 100644 --- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/InputTableConfigTest.java +++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoopImpl/mapreduce/InputTableConfigTest.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.accumulo.hadoop.mapreduce; +package org.apache.accumulo.hadoopImpl.mapreduce; import static org.junit.Assert.assertEquals; @@ -31,7 +31,6 @@ import java.util.Set; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.sample.SamplerConfiguration; import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.util.Pair; import org.apache.hadoop.io.Text; import org.junit.Before; import org.junit.Test; @@ -82,9 +81,9 @@ public class InputTableConfigTest { @Test public void testSerialization_columns() throws IOException { - Set> columns = new HashSet<>(); - columns.add(new Pair<>(new Text("cf1"), new Text("cq1"))); - columns.add(new Pair<>(new Text("cf2"), null)); + Set columns = new HashSet<>(); + columns.add(new IteratorSetting.Column(new Text("cf1"), new Text("cq1"))); + columns.add(new IteratorSetting.Column(new Text("cf2"), null)); tableQueryConfig.fetchColumns(columns); byte[] serialized = serialize(tableQueryConfig); diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/RangeInputSplitTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoopImpl/mapreduce/RangeInputSplitTest.java similarity index 90% rename from hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/RangeInputSplitTest.java rename to hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoopImpl/mapreduce/RangeInputSplitTest.java index 8be9c35..f9913ad 100644 --- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/RangeInputSplitTest.java +++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoopImpl/mapreduce/RangeInputSplitTest.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.accumulo.hadoop.mapreduce; +package org.apache.accumulo.hadoopImpl.mapreduce; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -35,9 +35,7 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.iterators.user.SummingCombiner; import org.apache.accumulo.core.iterators.user.WholeRowIterator; -import org.apache.accumulo.core.util.Pair; import org.apache.hadoop.io.Text; -import org.apache.log4j.Level; import org.junit.Test; import com.google.common.collect.ImmutableMap; @@ -70,10 +68,10 @@ public class RangeInputSplitTest { RangeInputSplit split = new RangeInputSplit("table", "1", new Range(new Key("a"), new Key("b")), new String[] {"localhost"}); - Set> fetchedColumns = new HashSet<>(); + Set fetchedColumns = new HashSet<>(); - fetchedColumns.add(new Pair<>(new Text("colf1"), new Text("colq1"))); - fetchedColumns.add(new Pair<>(new Text("colf2"), new Text("colq2"))); + fetchedColumns.add(new IteratorSetting.Column(new Text("colf1"), new Text("colq1"))); + fetchedColumns.add(new IteratorSetting.Column(new Text("colf2"), new Text("colq2"))); // Fake some iterators ArrayList iterators = new ArrayList<>(); @@ -91,7 +89,6 @@ public class RangeInputSplitTest { split.setUsesLocalIterators(true); split.setFetchedColumns(fetchedColumns); split.setIterators(iterators); - split.setLogLevel(Level.WARN); split.setExecutionHints(ImmutableMap.of("priority", "9")); ByteArrayOutputStream baos = new ByteArrayOutputStream(); @@ -113,7 +110,6 @@ public class RangeInputSplitTest { assertEquals(split.usesLocalIterators(), newSplit.usesLocalIterators()); assertEquals(split.getFetchedColumns(), newSplit.getFetchedColumns()); assertEquals(split.getIterators(), newSplit.getIterators()); - assertEquals(split.getLogLevel(), newSplit.getLogLevel()); assertEquals(split.getExecutionHints(), newSplit.getExecutionHints()); } diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/ConfiguratorBaseTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/ConfiguratorBaseTest.java index 3ccae56..f4fcee4 100644 --- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/ConfiguratorBaseTest.java +++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/ConfiguratorBaseTest.java @@ -21,18 +21,13 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import java.util.Properties; - import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.ClientInfo; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.conf.ClientProperty; import org.apache.hadoop.conf.Configuration; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; import org.junit.Test; public class ConfiguratorBaseTest { @@ -89,45 +84,6 @@ public class ConfiguratorBaseTest { assertTrue(info2.getAuthenticationToken() instanceof PasswordToken); } - @SuppressWarnings("deprecation") - @Test - public void testSetZooKeeperInstance() { - Configuration conf = new Configuration(); - ConfiguratorBase.setZooKeeperInstance(this.getClass(), conf, - org.apache.accumulo.core.client.ClientConfiguration.create() - .withInstance("testInstanceName").withZkHosts("testZooKeepers").withSsl(true) - .withZkTimeout(15000)); - - org.apache.accumulo.core.client.ClientConfiguration clientConf = ConfiguratorBase - .getClientConfiguration(this.getClass(), conf); - assertEquals("testInstanceName", clientConf - .get(org.apache.accumulo.core.client.ClientConfiguration.ClientProperty.INSTANCE_NAME)); - - Properties props = ConfiguratorBase.getClientInfo(this.getClass(), conf).getProperties(); - assertEquals("testInstanceName", props.getProperty(ClientProperty.INSTANCE_NAME.getKey())); - assertEquals("testZooKeepers", props.getProperty(ClientProperty.INSTANCE_ZOOKEEPERS.getKey())); - assertEquals("true", props.getProperty(ClientProperty.SSL_ENABLED.getKey())); - assertEquals("15000", props.getProperty(ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT.getKey())); - } - - @Test - public void testSetLogLevel() { - Configuration conf = new Configuration(); - Level currentLevel = Logger.getLogger(this.getClass()).getLevel(); - - ConfiguratorBase.setLogLevel(this.getClass(), conf, Level.DEBUG); - Logger.getLogger(this.getClass()).setLevel(currentLevel); - assertEquals(Level.DEBUG, ConfiguratorBase.getLogLevel(this.getClass(), conf)); - - ConfiguratorBase.setLogLevel(this.getClass(), conf, Level.INFO); - Logger.getLogger(this.getClass()).setLevel(currentLevel); - assertEquals(Level.INFO, ConfiguratorBase.getLogLevel(this.getClass(), conf)); - - ConfiguratorBase.setLogLevel(this.getClass(), conf, Level.FATAL); - Logger.getLogger(this.getClass()).setLevel(currentLevel); - assertEquals(Level.FATAL, ConfiguratorBase.getLogLevel(this.getClass(), conf)); - } - @Test public void testSetVisibilityCacheSize() { Configuration conf = new Configuration(); diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/lib/partition/RangePartitionerTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/partition/RangePartitionerTest.java similarity index 98% rename from hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/lib/partition/RangePartitionerTest.java rename to hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/partition/RangePartitionerTest.java index 95451f3..325e5ed 100644 --- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/lib/partition/RangePartitionerTest.java +++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/partition/RangePartitionerTest.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.accumulo.hadoop.mapreduce.lib.partition; +package org.apache.accumulo.hadoopImpl.mapreduce.lib.partition; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; diff --git a/hadoop-mapreduce/src/test/resources/log4j.properties b/hadoop-mapreduce/src/test/resources/log4j.properties new file mode 100644 index 0000000..40adebf --- /dev/null +++ b/hadoop-mapreduce/src/test/resources/log4j.properties @@ -0,0 +1,30 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +log4j.rootLogger=INFO, CA +log4j.appender.CA=org.apache.log4j.ConsoleAppender +log4j.appender.CA.layout=org.apache.log4j.PatternLayout +log4j.appender.CA.layout.ConversionPattern=[%t] %-5p %c %x - %m%n + +log4j.logger.org.apache.accumulo.core.iterators.system.VisibilityFilter=FATAL +log4j.logger.org.apache.accumulo.core.iterators.user.TransformingIteratorTest$IllegalVisCompactionKeyTransformingIterator=FATAL +log4j.logger.org.apache.accumulo.core.iterators.user.TransformingIteratorTest$IllegalVisKeyTransformingIterator=FATAL +log4j.logger.org.apache.commons.vfs2.impl.DefaultFileSystemManager=WARN +log4j.logger.org.apache.hadoop.mapred=ERROR +log4j.logger.org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter=ERROR +log4j.logger.org.apache.hadoop.util.ProcessTree=ERROR +log4j.logger.org.apache.hadoop.io.compress.CodecPool=FATAL +log4j.logger.org.apache.hadoop.util.NativeCodeLoader=FATAL +log4j.logger.org.apache.accumulo.core.util.format=FATAL