Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1503B117B7 for ; Mon, 21 Apr 2014 23:48:19 +0000 (UTC) Received: (qmail 14182 invoked by uid 500); 21 Apr 2014 23:48:10 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 14117 invoked by uid 500); 21 Apr 2014 23:48:08 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 13881 invoked by uid 99); 21 Apr 2014 23:48:04 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 21 Apr 2014 23:48:04 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id D6B09839065; Mon, 21 Apr 2014 23:48:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mdrob@apache.org To: commits@accumulo.apache.org Date: Mon, 21 Apr 2014 23:48:11 -0000 Message-Id: <706c631eaa904715be8788b53988cb2f@git.apache.org> In-Reply-To: <240dd757dcce408aa794d95887da7cc2@git.apache.org> References: <240dd757dcce408aa794d95887da7cc2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [09/11] ACCUMULO-1880 create mapreduce module http://git-wip-us.apache.org/repos/asf/accumulo/blob/99baad37/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java deleted file mode 100644 index e58e350..0000000 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java +++ /dev/null @@ -1,384 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.core.client.mapreduce; - -import java.io.IOException; -import java.util.Collection; -import java.util.List; -import java.util.Set; - -import org.apache.accumulo.core.client.ClientSideIteratorScanner; -import org.apache.accumulo.core.client.IsolatedScanner; -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.impl.TabletLocator; -import org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.util.Pair; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -/** - * This abstract {@link InputFormat} class allows MapReduce jobs to use Accumulo as the source of K,V pairs. - *

- * Subclasses must implement a {@link #createRecordReader(InputSplit, TaskAttemptContext)} to provide a {@link RecordReader} for K,V. - *

- * A static base class, RecordReaderBase, is provided to retrieve Accumulo {@link Key}/{@link Value} pairs, but one must implement its - * {@link RecordReaderBase#nextKeyValue()} to transform them to the desired generic types K,V. - *

- * See {@link AccumuloInputFormat} for an example implementation. - */ -public abstract class InputFormatBase extends AbstractInputFormat { - - /** - * Gets the table name from the configuration. - * - * @param context - * the Hadoop context for the configured job - * @return the table name - * @since 1.5.0 - * @see #setInputTableName(Job, String) - */ - protected static String getInputTableName(JobContext context) { - return InputConfigurator.getInputTableName(CLASS, getConfiguration(context)); - } - - /** - * Sets the name of the input table, over which this job will scan. - * - * @param job - * the Hadoop job instance to be configured - * @param tableName - * the table to use when the tablename is null in the write call - * @since 1.5.0 - */ - public static void setInputTableName(Job job, String tableName) { - InputConfigurator.setInputTableName(CLASS, job.getConfiguration(), tableName); - } - - /** - * Sets the input ranges to scan for the single input table associated with this job. - * - * @param job - * the Hadoop job instance to be configured - * @param ranges - * the ranges that will be mapped over - * @since 1.5.0 - */ - public static void setRanges(Job job, Collection ranges) { - InputConfigurator.setRanges(CLASS, job.getConfiguration(), ranges); - } - - /** - * Gets the ranges to scan over from a job. - * - * @param context - * the Hadoop context for the configured job - * @return the ranges - * @since 1.5.0 - * @see #setRanges(Job, Collection) - */ - protected static List getRanges(JobContext context) throws IOException { - return InputConfigurator.getRanges(CLASS, getConfiguration(context)); - } - - /** - * Restricts the columns that will be mapped over for this job for the default input table. - * - * @param job - * the Hadoop job instance to be configured - * @param columnFamilyColumnQualifierPairs - * a pair of {@link Text} objects corresponding to column family and column qualifier. If the column qualifier is null, the entire column family is - * selected. An empty set is the default and is equivalent to scanning the all columns. - * @since 1.5.0 - */ - public static void fetchColumns(Job job, Collection> columnFamilyColumnQualifierPairs) { - InputConfigurator.fetchColumns(CLASS, job.getConfiguration(), columnFamilyColumnQualifierPairs); - } - - /** - * Gets the columns to be mapped over from this job. - * - * @param context - * the Hadoop context for the configured job - * @return a set of columns - * @since 1.5.0 - * @see #fetchColumns(Job, Collection) - */ - protected static Set> getFetchedColumns(JobContext context) { - return InputConfigurator.getFetchedColumns(CLASS, getConfiguration(context)); - } - - /** - * Encode an iterator on the single input table for this job. - * - * @param job - * the Hadoop job instance to be configured - * @param cfg - * the configuration of the iterator - * @since 1.5.0 - */ - public static void addIterator(Job job, IteratorSetting cfg) { - InputConfigurator.addIterator(CLASS, job.getConfiguration(), cfg); - } - - /** - * Gets a list of the iterator settings (for iterators to apply to a scanner) from this configuration. - * - * @param context - * the Hadoop context for the configured job - * @return a list of iterators - * @since 1.5.0 - * @see #addIterator(Job, IteratorSetting) - */ - protected static List getIterators(JobContext context) { - return InputConfigurator.getIterators(CLASS, getConfiguration(context)); - } - - /** - * Controls the automatic adjustment of ranges for this job. This feature merges overlapping ranges, then splits them to align with tablet boundaries. - * Disabling this feature will cause exactly one Map task to be created for each specified range. The default setting is enabled. * - * - *

- * By default, this feature is enabled. - * - * @param job - * the Hadoop job instance to be configured - * @param enableFeature - * the feature is enabled if true, disabled otherwise - * @see #setRanges(Job, Collection) - * @since 1.5.0 - */ - public static void setAutoAdjustRanges(Job job, boolean enableFeature) { - InputConfigurator.setAutoAdjustRanges(CLASS, job.getConfiguration(), enableFeature); - } - - /** - * Determines whether a configuration has auto-adjust ranges enabled. - * - * @param context - * the Hadoop context for the configured job - * @return false if the feature is disabled, true otherwise - * @since 1.5.0 - * @see #setAutoAdjustRanges(Job, boolean) - */ - protected static boolean getAutoAdjustRanges(JobContext context) { - return InputConfigurator.getAutoAdjustRanges(CLASS, getConfiguration(context)); - } - - /** - * Controls the use of the {@link IsolatedScanner} in this job. - * - *

- * By default, this feature is disabled. - * - * @param job - * the Hadoop job instance to be configured - * @param enableFeature - * the feature is enabled if true, disabled otherwise - * @since 1.5.0 - */ - public static void setScanIsolation(Job job, boolean enableFeature) { - InputConfigurator.setScanIsolation(CLASS, job.getConfiguration(), enableFeature); - } - - /** - * Determines whether a configuration has isolation enabled. - * - * @param context - * the Hadoop context for the configured job - * @return true if the feature is enabled, false otherwise - * @since 1.5.0 - * @see #setScanIsolation(Job, boolean) - */ - protected static boolean isIsolated(JobContext context) { - return InputConfigurator.isIsolated(CLASS, getConfiguration(context)); - } - - /** - * Controls the use of the {@link ClientSideIteratorScanner} in this job. Enabling this feature will cause the iterator stack to be constructed within the Map - * task, rather than within the Accumulo TServer. To use this feature, all classes needed for those iterators must be available on the classpath for the task. - * - *

- * By default, this feature is disabled. - * - * @param job - * the Hadoop job instance to be configured - * @param enableFeature - * the feature is enabled if true, disabled otherwise - * @since 1.5.0 - */ - public static void setLocalIterators(Job job, boolean enableFeature) { - InputConfigurator.setLocalIterators(CLASS, job.getConfiguration(), enableFeature); - } - - /** - * Determines whether a configuration uses local iterators. - * - * @param context - * the Hadoop context for the configured job - * @return true if the feature is enabled, false otherwise - * @since 1.5.0 - * @see #setLocalIterators(Job, boolean) - */ - protected static boolean usesLocalIterators(JobContext context) { - return InputConfigurator.usesLocalIterators(CLASS, getConfiguration(context)); - } - - /** - *

- * Enable reading offline tables. By default, this feature is disabled and only online tables are scanned. This will make the map reduce job directly read the - * table's files. If the table is not offline, then the job will fail. If the table comes online during the map reduce job, it is likely that the job will - * fail. - * - *

- * To use this option, the map reduce user will need access to read the Accumulo directory in HDFS. - * - *

- * Reading the offline table will create the scan time iterator stack in the map process. So any iterators that are configured for the table will need to be - * on the mapper's classpath. - * - *

- * One way to use this feature is to clone a table, take the clone offline, and use the clone as the input table for a map reduce job. If you plan to map - * reduce over the data many times, it may be better to the compact the table, clone it, take it offline, and use the clone for all map reduce jobs. The - * reason to do this is that compaction will reduce each tablet in the table to one file, and it is faster to read from one file. - * - *

- * There are two possible advantages to reading a tables file directly out of HDFS. First, you may see better read performance. Second, it will support - * speculative execution better. When reading an online table speculative execution can put more load on an already slow tablet server. - * - *

- * By default, this feature is disabled. - * - * @param job - * the Hadoop job instance to be configured - * @param enableFeature - * the feature is enabled if true, disabled otherwise - * @since 1.5.0 - */ - public static void setOfflineTableScan(Job job, boolean enableFeature) { - InputConfigurator.setOfflineTableScan(CLASS, job.getConfiguration(), enableFeature); - } - - /** - * Determines whether a configuration has the offline table scan feature enabled. - * - * @param context - * the Hadoop context for the configured job - * @return true if the feature is enabled, false otherwise - * @since 1.5.0 - * @see #setOfflineTableScan(Job, boolean) - */ - protected static boolean isOfflineScan(JobContext context) { - return InputConfigurator.isOfflineScan(CLASS, getConfiguration(context)); - } - - /** - * Initializes an Accumulo {@link org.apache.accumulo.core.client.impl.TabletLocator} based on the configuration. - * - * @param context - * the Hadoop context for the configured job - * @return an Accumulo tablet locator - * @throws org.apache.accumulo.core.client.TableNotFoundException - * if the table name set on the configuration doesn't exist - * @since 1.5.0 - * @deprecated since 1.6.0 - */ - @Deprecated - protected static TabletLocator getTabletLocator(JobContext context) throws TableNotFoundException { - return InputConfigurator.getTabletLocator(CLASS, getConfiguration(context), InputConfigurator.getInputTableName(CLASS, getConfiguration(context))); - } - - protected abstract static class RecordReaderBase extends AbstractRecordReader { - - /** - * Apply the configured iterators from the configuration to the scanner for the specified table name - * - * @param context - * the Hadoop context for the configured job - * @param scanner - * the scanner to configure - * @since 1.6.0 - */ - @Override - protected void setupIterators(TaskAttemptContext context, Scanner scanner, String tableName, org.apache.accumulo.core.client.mapreduce.RangeInputSplit split) { - setupIterators(context, scanner, split); - } - - /** - * Apply the configured iterators from the configuration to the scanner. - * - * @param context - * the Hadoop context for the configured job - * @param scanner - * the scanner to configure - */ - @Deprecated - protected void setupIterators(TaskAttemptContext context, Scanner scanner) { - setupIterators(context, scanner, null); - } - - /** - * Initialize a scanner over the given input split using this task attempt configuration. - */ - protected void setupIterators(TaskAttemptContext context, Scanner scanner, org.apache.accumulo.core.client.mapreduce.RangeInputSplit split) { - List iterators = null; - if (null == split) { - iterators = getIterators(context); - } else { - iterators = split.getIterators(); - if (null == iterators) { - iterators = getIterators(context); - } - } - for (IteratorSetting iterator : iterators) - scanner.addScanIterator(iterator); - } - } - - /** - * @deprecated since 1.5.2; Use {@link org.apache.accumulo.core.client.mapreduce.RangeInputSplit} instead. - * @see org.apache.accumulo.core.client.mapreduce.RangeInputSplit - */ - @Deprecated - public static class RangeInputSplit extends org.apache.accumulo.core.client.mapreduce.RangeInputSplit { - - public RangeInputSplit() { - super(); - } - - public RangeInputSplit(RangeInputSplit other) throws IOException { - super(other); - } - - protected RangeInputSplit(String table, Range range, String[] locations) { - super(table, "", range, locations); - } - - public RangeInputSplit(String table, String tableId, Range range, String[] locations) { - super(table, tableId, range, locations); - } - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/99baad37/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputTableConfig.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputTableConfig.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputTableConfig.java deleted file mode 100644 index e59451e..0000000 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputTableConfig.java +++ /dev/null @@ -1,367 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.core.client.mapreduce; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; - -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.util.Pair; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; - -/** - * This class to holds a batch scan configuration for a table. It contains all the properties needed to specify how rows should be returned from the table. - */ -public class InputTableConfig implements Writable { - - private List iterators; - private List ranges; - private Collection> columns; - - private boolean autoAdjustRanges = true; - private boolean useLocalIterators = false; - private boolean useIsolatedScanners = false; - private boolean offlineScan = false; - - public InputTableConfig() {} - - /** - * Creates a batch scan config object out of a previously serialized batch scan config object. - * - * @param input - * the data input of the serialized batch scan config - */ - public InputTableConfig(DataInput input) throws IOException { - readFields(input); - } - - /** - * Sets the input ranges to scan for all tables associated with this job. This will be added to any per-table ranges that have been set using - * - * @param ranges - * the ranges that will be mapped over - * @since 1.6.0 - */ - public InputTableConfig setRanges(List ranges) { - this.ranges = ranges; - return this; - } - - /** - * Returns the ranges to be queried in the configuration - */ - public List getRanges() { - return ranges != null ? ranges : new ArrayList(); - } - - /** - * Restricts the columns that will be mapped over for this job for the default input table. - * - * @param columns - * a pair of {@link Text} objects corresponding to column family and column qualifier. If the column qualifier is null, the entire column family is - * selected. An empty set is the default and is equivalent to scanning the all columns. - * @since 1.6.0 - */ - public InputTableConfig fetchColumns(Collection> columns) { - this.columns = columns; - return this; - } - - /** - * Returns the columns to be fetched for this configuration - */ - public Collection> getFetchedColumns() { - return columns != null ? columns : new HashSet>(); - } - - /** - * Set iterators on to be used in the query. - * - * @param iterators - * the configurations for the iterators - * @since 1.6.0 - */ - public InputTableConfig setIterators(List iterators) { - this.iterators = iterators; - return this; - } - - /** - * Returns the iterators to be set on this configuration - */ - public List getIterators() { - return iterators != null ? iterators : new ArrayList(); - } - - /** - * Controls the automatic adjustment of ranges for this job. This feature merges overlapping ranges, then splits them to align with tablet boundaries. - * Disabling this feature will cause exactly one Map task to be created for each specified range. The default setting is enabled. * - * - *

- * By default, this feature is enabled. - * - * @param autoAdjustRanges - * the feature is enabled if true, disabled otherwise - * @see #setRanges(java.util.List) - * @since 1.6.0 - */ - public InputTableConfig setAutoAdjustRanges(boolean autoAdjustRanges) { - this.autoAdjustRanges = autoAdjustRanges; - return this; - } - - /** - * Determines whether a configuration has auto-adjust ranges enabled. - * - * @return false if the feature is disabled, true otherwise - * @since 1.6.0 - * @see #setAutoAdjustRanges(boolean) - */ - public boolean shouldAutoAdjustRanges() { - return autoAdjustRanges; - } - - /** - * Controls the use of the {@link org.apache.accumulo.core.client.ClientSideIteratorScanner} in this job. Enabling this feature will cause the iterator stack - * to be constructed within the Map task, rather than within the Accumulo TServer. To use this feature, all classes needed for those iterators must be - * available on the classpath for the task. - * - *

- * By default, this feature is disabled. - * - * @param useLocalIterators - * the feature is enabled if true, disabled otherwise - * @since 1.6.0 - */ - public InputTableConfig setUseLocalIterators(boolean useLocalIterators) { - this.useLocalIterators = useLocalIterators; - return this; - } - - /** - * Determines whether a configuration uses local iterators. - * - * @return true if the feature is enabled, false otherwise - * @since 1.6.0 - * @see #setUseLocalIterators(boolean) - */ - public boolean shouldUseLocalIterators() { - return useLocalIterators; - } - - /** - *

- * Enable reading offline tables. By default, this feature is disabled and only online tables are scanned. This will make the map reduce job directly read the - * table's files. If the table is not offline, then the job will fail. If the table comes online during the map reduce job, it is likely that the job will - * fail. - * - *

- * To use this option, the map reduce user will need access to read the Accumulo directory in HDFS. - * - *

- * Reading the offline table will create the scan time iterator stack in the map process. So any iterators that are configured for the table will need to be - * on the mapper's classpath. The accumulo-site.xml may need to be on the mapper's classpath if HDFS or the Accumulo directory in HDFS are non-standard. - * - *

- * One way to use this feature is to clone a table, take the clone offline, and use the clone as the input table for a map reduce job. If you plan to map - * reduce over the data many times, it may be better to the compact the table, clone it, take it offline, and use the clone for all map reduce jobs. The - * reason to do this is that compaction will reduce each tablet in the table to one file, and it is faster to read from one file. - * - *

- * There are two possible advantages to reading a tables file directly out of HDFS. First, you may see better read performance. Second, it will support - * speculative execution better. When reading an online table speculative execution can put more load on an already slow tablet server. - * - *

- * By default, this feature is disabled. - * - * @param offlineScan - * the feature is enabled if true, disabled otherwise - * @since 1.6.0 - */ - public InputTableConfig setOfflineScan(boolean offlineScan) { - this.offlineScan = offlineScan; - return this; - } - - /** - * Determines whether a configuration has the offline table scan feature enabled. - * - * @return true if the feature is enabled, false otherwise - * @since 1.6.0 - * @see #setOfflineScan(boolean) - */ - public boolean isOfflineScan() { - return offlineScan; - } - - /** - * Controls the use of the {@link org.apache.accumulo.core.client.IsolatedScanner} in this job. - * - *

- * By default, this feature is disabled. - * - * @param useIsolatedScanners - * the feature is enabled if true, disabled otherwise - * @since 1.6.0 - */ - public InputTableConfig setUseIsolatedScanners(boolean useIsolatedScanners) { - this.useIsolatedScanners = useIsolatedScanners; - return this; - } - - /** - * Determines whether a configuration has isolation enabled. - * - * @return true if the feature is enabled, false otherwise - * @since 1.6.0 - * @see #setUseIsolatedScanners(boolean) - */ - public boolean shouldUseIsolatedScanners() { - return useIsolatedScanners; - } - - /** - * Writes the state for the current object out to the specified {@link DataOutput} - * - * @param dataOutput - * the output for which to write the object's state - */ - @Override - public void write(DataOutput dataOutput) throws IOException { - if (iterators != null) { - dataOutput.writeInt(iterators.size()); - for (IteratorSetting setting : iterators) - setting.write(dataOutput); - } else { - dataOutput.writeInt(0); - } - if (ranges != null) { - dataOutput.writeInt(ranges.size()); - for (Range range : ranges) - range.write(dataOutput); - } else { - dataOutput.writeInt(0); - } - if (columns != null) { - dataOutput.writeInt(columns.size()); - for (Pair column : columns) { - if (column.getSecond() == null) { - dataOutput.writeInt(1); - column.getFirst().write(dataOutput); - } else { - dataOutput.writeInt(2); - column.getFirst().write(dataOutput); - column.getSecond().write(dataOutput); - } - } - } else { - dataOutput.writeInt(0); - } - dataOutput.writeBoolean(autoAdjustRanges); - dataOutput.writeBoolean(useLocalIterators); - dataOutput.writeBoolean(useIsolatedScanners); - } - - /** - * Reads the fields in the {@link DataInput} into the current object - * - * @param dataInput - * the input fields to read into the current object - */ - @Override - public void readFields(DataInput dataInput) throws IOException { - // load iterators - long iterSize = dataInput.readInt(); - if (iterSize > 0) - iterators = new ArrayList(); - for (int i = 0; i < iterSize; i++) - iterators.add(new IteratorSetting(dataInput)); - // load ranges - long rangeSize = dataInput.readInt(); - if (rangeSize > 0) - ranges = new ArrayList(); - for (int i = 0; i < rangeSize; i++) { - Range range = new Range(); - range.readFields(dataInput); - ranges.add(range); - } - // load columns - long columnSize = dataInput.readInt(); - if (columnSize > 0) - columns = new HashSet>(); - for (int i = 0; i < columnSize; i++) { - long numPairs = dataInput.readInt(); - Text colFam = new Text(); - colFam.readFields(dataInput); - if (numPairs == 1) { - columns.add(new Pair(colFam, null)); - } else if (numPairs == 2) { - Text colQual = new Text(); - colQual.readFields(dataInput); - columns.add(new Pair(colFam, colQual)); - } - } - autoAdjustRanges = dataInput.readBoolean(); - useLocalIterators = dataInput.readBoolean(); - useIsolatedScanners = dataInput.readBoolean(); - } - - @Override - public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - - InputTableConfig that = (InputTableConfig) o; - - if (autoAdjustRanges != that.autoAdjustRanges) - return false; - if (offlineScan != that.offlineScan) - return false; - if (useIsolatedScanners != that.useIsolatedScanners) - return false; - if (useLocalIterators != that.useLocalIterators) - return false; - if (columns != null ? !columns.equals(that.columns) : that.columns != null) - return false; - if (iterators != null ? !iterators.equals(that.iterators) : that.iterators != null) - return false; - if (ranges != null ? !ranges.equals(that.ranges) : that.ranges != null) - return false; - return true; - } - - @Override - public int hashCode() { - int result = 31 * (iterators != null ? iterators.hashCode() : 0); - result = 31 * result + (ranges != null ? ranges.hashCode() : 0); - result = 31 * result + (columns != null ? columns.hashCode() : 0); - result = 31 * result + (autoAdjustRanges ? 1 : 0); - result = 31 * result + (useLocalIterators ? 1 : 0); - result = 31 * result + (useIsolatedScanners ? 1 : 0); - result = 31 * result + (offlineScan ? 1 : 0); - return result; - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/99baad37/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java deleted file mode 100644 index 4b5a149..0000000 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java +++ /dev/null @@ -1,490 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.core.client.mapreduce; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.math.BigInteger; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import org.apache.accumulo.core.client.ClientConfiguration; -import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.ZooKeeperInstance; -import org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator; -import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase.TokenSource; -import org.apache.accumulo.core.client.mock.MockInstance; -import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; -import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.AuthenticationTokenSerializer; -import org.apache.accumulo.core.data.ByteSequence; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.PartialKey; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.util.Pair; -import org.apache.commons.codec.binary.Base64; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.log4j.Level; - -/** - * The Class RangeInputSplit. Encapsulates an Accumulo range for use in Map Reduce jobs. - */ -public class RangeInputSplit extends InputSplit implements Writable { - private Range range; - private String[] locations; - private String tableId, tableName, instanceName, zooKeepers, principal; - private TokenSource tokenSource; - private String tokenFile; - private AuthenticationToken token; - private Boolean offline, mockInstance, isolatedScan, localIterators; - private Authorizations auths; - private Set> fetchedColumns; - private List iterators; - private Level level; - - public RangeInputSplit() { - range = new Range(); - locations = new String[0]; - tableName = ""; - tableId = ""; - } - - public RangeInputSplit(RangeInputSplit split) throws IOException { - this.setRange(split.getRange()); - this.setLocations(split.getLocations()); - this.setTableName(split.getTableName()); - this.setTableId(split.getTableId()); - } - - protected RangeInputSplit(String table, String tableId, Range range, String[] locations) { - this.range = range; - setLocations(locations); - this.tableName = table; - this.tableId = tableId; - } - - public Range getRange() { - return range; - } - - private static byte[] extractBytes(ByteSequence seq, int numBytes) { - byte[] bytes = new byte[numBytes + 1]; - bytes[0] = 0; - for (int i = 0; i < numBytes; i++) { - if (i >= seq.length()) - bytes[i + 1] = 0; - else - bytes[i + 1] = seq.byteAt(i); - } - return bytes; - } - - public static float getProgress(ByteSequence start, ByteSequence end, ByteSequence position) { - int maxDepth = Math.min(Math.max(end.length(), start.length()), position.length()); - BigInteger startBI = new BigInteger(extractBytes(start, maxDepth)); - BigInteger endBI = new BigInteger(extractBytes(end, maxDepth)); - BigInteger positionBI = new BigInteger(extractBytes(position, maxDepth)); - return (float) (positionBI.subtract(startBI).doubleValue() / endBI.subtract(startBI).doubleValue()); - } - - public float getProgress(Key currentKey) { - if (currentKey == null) - return 0f; - if (range.getStartKey() != null && range.getEndKey() != null) { - if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW) != 0) { - // just look at the row progress - return getProgress(range.getStartKey().getRowData(), range.getEndKey().getRowData(), currentKey.getRowData()); - } else if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW_COLFAM) != 0) { - // just look at the column family progress - return getProgress(range.getStartKey().getColumnFamilyData(), range.getEndKey().getColumnFamilyData(), currentKey.getColumnFamilyData()); - } else if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW_COLFAM_COLQUAL) != 0) { - // just look at the column qualifier progress - return getProgress(range.getStartKey().getColumnQualifierData(), range.getEndKey().getColumnQualifierData(), currentKey.getColumnQualifierData()); - } - } - // if we can't figure it out, then claim no progress - return 0f; - } - - /** - * This implementation of length is only an estimate, it does not provide exact values. Do not have your code rely on this return value. - */ - @Override - public long getLength() throws IOException { - Text startRow = range.isInfiniteStartKey() ? new Text(new byte[] {Byte.MIN_VALUE}) : range.getStartKey().getRow(); - Text stopRow = range.isInfiniteStopKey() ? new Text(new byte[] {Byte.MAX_VALUE}) : range.getEndKey().getRow(); - int maxCommon = Math.min(7, Math.min(startRow.getLength(), stopRow.getLength())); - long diff = 0; - - byte[] start = startRow.getBytes(); - byte[] stop = stopRow.getBytes(); - for (int i = 0; i < maxCommon; ++i) { - diff |= 0xff & (start[i] ^ stop[i]); - diff <<= Byte.SIZE; - } - - if (startRow.getLength() != stopRow.getLength()) - diff |= 0xff; - - return diff + 1; - } - - @Override - public String[] getLocations() throws IOException { - return Arrays.copyOf(locations, locations.length); - } - - @Override - public void readFields(DataInput in) throws IOException { - range.readFields(in); - tableName = in.readUTF(); - tableId = in.readUTF(); - int numLocs = in.readInt(); - locations = new String[numLocs]; - for (int i = 0; i < numLocs; ++i) - locations[i] = in.readUTF(); - - if (in.readBoolean()) { - isolatedScan = in.readBoolean(); - } - - if (in.readBoolean()) { - offline = in.readBoolean(); - } - - if (in.readBoolean()) { - localIterators = in.readBoolean(); - } - - if (in.readBoolean()) { - mockInstance = in.readBoolean(); - } - - if (in.readBoolean()) { - int numColumns = in.readInt(); - List columns = new ArrayList(numColumns); - for (int i = 0; i < numColumns; i++) { - columns.add(in.readUTF()); - } - - fetchedColumns = InputConfigurator.deserializeFetchedColumns(columns); - } - - if (in.readBoolean()) { - String strAuths = in.readUTF(); - auths = new Authorizations(strAuths.getBytes(StandardCharsets.UTF_8)); - } - - if (in.readBoolean()) { - principal = in.readUTF(); - } - - if (in.readBoolean()) { - int ordinal = in.readInt(); - this.tokenSource = TokenSource.values()[ordinal]; - - switch (this.tokenSource) { - case INLINE: - String tokenClass = in.readUTF(); - byte[] base64TokenBytes = in.readUTF().getBytes(StandardCharsets.UTF_8); - byte[] tokenBytes = Base64.decodeBase64(base64TokenBytes); - - this.token = AuthenticationTokenSerializer.deserialize(tokenClass, tokenBytes); - break; - - case FILE: - this.tokenFile = in.readUTF(); - - break; - default: - throw new IOException("Cannot parse unknown TokenSource ordinal"); - } - } - - if (in.readBoolean()) { - instanceName = in.readUTF(); - } - - if (in.readBoolean()) { - zooKeepers = in.readUTF(); - } - - if (in.readBoolean()) { - level = Level.toLevel(in.readInt()); - } - } - - @Override - public void write(DataOutput out) throws IOException { - range.write(out); - out.writeUTF(tableName); - out.writeUTF(tableId); - out.writeInt(locations.length); - for (int i = 0; i < locations.length; ++i) - out.writeUTF(locations[i]); - - out.writeBoolean(null != isolatedScan); - if (null != isolatedScan) { - out.writeBoolean(isolatedScan); - } - - out.writeBoolean(null != offline); - if (null != offline) { - out.writeBoolean(offline); - } - - out.writeBoolean(null != localIterators); - if (null != localIterators) { - out.writeBoolean(localIterators); - } - - out.writeBoolean(null != mockInstance); - if (null != mockInstance) { - out.writeBoolean(mockInstance); - } - - out.writeBoolean(null != fetchedColumns); - if (null != fetchedColumns) { - String[] cols = InputConfigurator.serializeColumns(fetchedColumns); - out.writeInt(cols.length); - for (String col : cols) { - out.writeUTF(col); - } - } - - out.writeBoolean(null != auths); - if (null != auths) { - out.writeUTF(auths.serialize()); - } - - out.writeBoolean(null != principal); - if (null != principal) { - out.writeUTF(principal); - } - - out.writeBoolean(null != tokenSource); - if (null != tokenSource) { - out.writeInt(tokenSource.ordinal()); - - if (null != token && null != tokenFile) { - throw new IOException("Cannot use both inline AuthenticationToken and file-based AuthenticationToken"); - } else if (null != token) { - out.writeUTF(token.getClass().getCanonicalName()); - out.writeUTF(Base64.encodeBase64String(AuthenticationTokenSerializer.serialize(token))); - } else { - out.writeUTF(tokenFile); - } - } - - out.writeBoolean(null != instanceName); - if (null != instanceName) { - out.writeUTF(instanceName); - } - - out.writeBoolean(null != zooKeepers); - if (null != zooKeepers) { - out.writeUTF(zooKeepers); - } - - out.writeBoolean(null != level); - if (null != level) { - out.writeInt(level.toInt()); - } - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(256); - sb.append("Range: ").append(range); - sb.append(" Locations: ").append(Arrays.asList(locations)); - sb.append(" Table: ").append(tableName); - sb.append(" TableID: ").append(tableId); - sb.append(" InstanceName: ").append(instanceName); - sb.append(" zooKeepers: ").append(zooKeepers); - sb.append(" principal: ").append(principal); - sb.append(" tokenSource: ").append(tokenSource); - sb.append(" authenticationToken: ").append(token); - sb.append(" authenticationTokenFile: ").append(tokenFile); - sb.append(" Authorizations: ").append(auths); - sb.append(" offlineScan: ").append(offline); - sb.append(" mockInstance: ").append(mockInstance); - sb.append(" isolatedScan: ").append(isolatedScan); - sb.append(" localIterators: ").append(localIterators); - sb.append(" fetchColumns: ").append(fetchedColumns); - sb.append(" iterators: ").append(iterators); - sb.append(" logLevel: ").append(level); - return sb.toString(); - } - - public String getTableName() { - return tableName; - } - - public void setTableName(String table) { - this.tableName = table; - } - - public void setTableId(String tableId) { - this.tableId = tableId; - } - - public String getTableId() { - return tableId; - } - - public Instance getInstance() { - if (null == instanceName) { - return null; - } - - if (isMockInstance()) { - return new MockInstance(getInstanceName()); - } - - if (null == zooKeepers) { - return null; - } - - return new ZooKeeperInstance(ClientConfiguration.loadDefault().withInstance(getInstanceName()).withZkHosts(getZooKeepers())); - } - - public String getInstanceName() { - return instanceName; - } - - public void setInstanceName(String instanceName) { - this.instanceName = instanceName; - } - - public String getZooKeepers() { - return zooKeepers; - } - - public void setZooKeepers(String zooKeepers) { - this.zooKeepers = zooKeepers; - } - - public String getPrincipal() { - return principal; - } - - public void setPrincipal(String principal) { - this.principal = principal; - } - - public AuthenticationToken getToken() { - return token; - } - - public void setToken(AuthenticationToken token) { - this.tokenSource = TokenSource.INLINE; - this.token = token; - } - - public void setToken(String tokenFile) { - this.tokenSource = TokenSource.FILE; - this.tokenFile = tokenFile; - } - - public Boolean isOffline() { - return offline; - } - - public void setOffline(Boolean offline) { - this.offline = offline; - } - - public void setLocations(String[] locations) { - this.locations = Arrays.copyOf(locations, locations.length); - } - - public Boolean isMockInstance() { - return mockInstance; - } - - public void setMockInstance(Boolean mockInstance) { - this.mockInstance = mockInstance; - } - - public Boolean isIsolatedScan() { - return isolatedScan; - } - - public void setIsolatedScan(Boolean isolatedScan) { - this.isolatedScan = isolatedScan; - } - - public Authorizations getAuths() { - return auths; - } - - public void setAuths(Authorizations auths) { - this.auths = auths; - } - - public void setRange(Range range) { - this.range = range; - } - - public Boolean usesLocalIterators() { - return localIterators; - } - - public void setUsesLocalIterators(Boolean localIterators) { - this.localIterators = localIterators; - } - - public Set> getFetchedColumns() { - return fetchedColumns; - } - - public void setFetchedColumns(Collection> fetchedColumns) { - this.fetchedColumns = new HashSet>(); - for (Pair columns : fetchedColumns) { - this.fetchedColumns.add(columns); - } - } - - public void setFetchedColumns(Set> fetchedColumns) { - this.fetchedColumns = fetchedColumns; - } - - public List getIterators() { - return iterators; - } - - public void setIterators(List iterators) { - this.iterators = iterators; - } - - public Level getLogLevel() { - return level; - } - - public void setLogLevel(Level level) { - this.level = level; - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/99baad37/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/ConfiguratorBase.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/ConfiguratorBase.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/ConfiguratorBase.java deleted file mode 100644 index 4610556..0000000 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/ConfiguratorBase.java +++ /dev/null @@ -1,369 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.core.client.mapreduce.lib.impl; - -import static com.google.common.base.Preconditions.checkArgument; - -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.nio.charset.StandardCharsets; - -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.ClientConfiguration; -import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.ZooKeeperInstance; -import org.apache.accumulo.core.client.mock.MockInstance; -import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; -import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.AuthenticationTokenSerializer; -import org.apache.accumulo.core.security.Credentials; -import org.apache.commons.codec.binary.Base64; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.util.StringUtils; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; - -/** - * @since 1.6.0 - */ -public class ConfiguratorBase { - - /** - * Configuration keys for {@link Instance#getConnector(String, AuthenticationToken)}. - * - * @since 1.6.0 - */ - public static enum ConnectorInfo { - IS_CONFIGURED, PRINCIPAL, TOKEN, - } - - public static enum TokenSource { - FILE, INLINE; - - private String prefix; - - private TokenSource() { - prefix = name().toLowerCase() + ":"; - } - - public String prefix() { - return prefix; - } - } - - /** - * Configuration keys for {@link Instance}, {@link ZooKeeperInstance}, and {@link MockInstance}. - * - * @since 1.6.0 - */ - public static enum InstanceOpts { - TYPE, NAME, ZOO_KEEPERS, CLIENT_CONFIG; - } - - /** - * Configuration keys for general configuration options. - * - * @since 1.6.0 - */ - public static enum GeneralOpts { - LOG_LEVEL - } - - /** - * Provides a configuration key for a given feature enum, prefixed by the implementingClass - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param e - * the enum used to provide the unique part of the configuration key - * @return the configuration key - * @since 1.6.0 - */ - protected static String enumToConfKey(Class implementingClass, Enum e) { - return implementingClass.getSimpleName() + "." + e.getDeclaringClass().getSimpleName() + "." + StringUtils.camelize(e.name().toLowerCase()); - } - - /** - * Sets the connector information needed to communicate with Accumulo in this job. - * - *

- * WARNING: The serialized token is stored in the configuration and shared with all MapReduce tasks. It is BASE64 encoded to provide a charset safe - * conversion to a string, and is not intended to be secure. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @param principal - * a valid Accumulo user name - * @param token - * the user's password - * @since 1.6.0 - */ - public static void setConnectorInfo(Class implementingClass, Configuration conf, String principal, AuthenticationToken token) - throws AccumuloSecurityException { - if (isConnectorInfoSet(implementingClass, conf)) - throw new IllegalStateException("Connector info for " + implementingClass.getSimpleName() + " can only be set once per job"); - - checkArgument(principal != null, "principal is null"); - checkArgument(token != null, "token is null"); - conf.setBoolean(enumToConfKey(implementingClass, ConnectorInfo.IS_CONFIGURED), true); - conf.set(enumToConfKey(implementingClass, ConnectorInfo.PRINCIPAL), principal); - conf.set(enumToConfKey(implementingClass, ConnectorInfo.TOKEN), - TokenSource.INLINE.prefix() + token.getClass().getName() + ":" + Base64.encodeBase64String(AuthenticationTokenSerializer.serialize(token))); - } - - /** - * Sets the connector information needed to communicate with Accumulo in this job. - * - *

- * Pulls a token file into the Distributed Cache that contains the authentication token in an attempt to be more secure than storing the password in the - * Configuration. Token file created with "bin/accumulo create-token". - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @param principal - * a valid Accumulo user name - * @param tokenFile - * the path to the token file in DFS - * @since 1.6.0 - */ - public static void setConnectorInfo(Class implementingClass, Configuration conf, String principal, String tokenFile) throws AccumuloSecurityException { - if (isConnectorInfoSet(implementingClass, conf)) - throw new IllegalStateException("Connector info for " + implementingClass.getSimpleName() + " can only be set once per job"); - - checkArgument(principal != null, "principal is null"); - checkArgument(tokenFile != null, "tokenFile is null"); - - try { - DistributedCacheHelper.addCacheFile(new URI(tokenFile), conf); - } catch (URISyntaxException e) { - throw new IllegalStateException("Unable to add tokenFile \"" + tokenFile + "\" to distributed cache."); - } - - conf.setBoolean(enumToConfKey(implementingClass, ConnectorInfo.IS_CONFIGURED), true); - conf.set(enumToConfKey(implementingClass, ConnectorInfo.PRINCIPAL), principal); - conf.set(enumToConfKey(implementingClass, ConnectorInfo.TOKEN), TokenSource.FILE.prefix() + tokenFile); - } - - /** - * Determines if the connector info has already been set for this instance. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @return true if the connector info has already been set, false otherwise - * @since 1.6.0 - * @see #setConnectorInfo(Class, Configuration, String, AuthenticationToken) - */ - public static Boolean isConnectorInfoSet(Class implementingClass, Configuration conf) { - return conf.getBoolean(enumToConfKey(implementingClass, ConnectorInfo.IS_CONFIGURED), false); - } - - /** - * Gets the user name from the configuration. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @return the principal - * @since 1.6.0 - * @see #setConnectorInfo(Class, Configuration, String, AuthenticationToken) - */ - public static String getPrincipal(Class implementingClass, Configuration conf) { - return conf.get(enumToConfKey(implementingClass, ConnectorInfo.PRINCIPAL)); - } - - /** - * Gets the authenticated token from either the specified token file or directly from the configuration, whichever was used when the job was configured. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @return the principal's authentication token - * @since 1.6.0 - * @see #setConnectorInfo(Class, Configuration, String, AuthenticationToken) - * @see #setConnectorInfo(Class, Configuration, String, String) - */ - public static AuthenticationToken getAuthenticationToken(Class implementingClass, Configuration conf) { - String token = conf.get(enumToConfKey(implementingClass, ConnectorInfo.TOKEN)); - if (token == null || token.isEmpty()) - return null; - if (token.startsWith(TokenSource.INLINE.prefix())) { - String[] args = token.substring(TokenSource.INLINE.prefix().length()).split(":", 2); - if (args.length == 2) - return AuthenticationTokenSerializer.deserialize(args[0], Base64.decodeBase64(args[1].getBytes(StandardCharsets.UTF_8))); - } else if (token.startsWith(TokenSource.FILE.prefix())) { - String tokenFileName = token.substring(TokenSource.FILE.prefix().length()); - return getTokenFromFile(conf, getPrincipal(implementingClass, conf), tokenFileName); - } - - throw new IllegalStateException("Token was not properly serialized into the configuration"); - } - - /** - * Reads from the token file in distributed cache. Currently, the token file stores data separated by colons e.g. principal:token_class:token - * - * @param conf - * the Hadoop context for the configured job - * @return path to the token file as a String - * @since 1.6.0 - * @see #setConnectorInfo(Class, Configuration, String, AuthenticationToken) - */ - public static AuthenticationToken getTokenFromFile(Configuration conf, String principal, String tokenFile) { - FSDataInputStream in = null; - try { - URI[] uris = DistributedCacheHelper.getCacheFiles(conf); - Path path = null; - for (URI u : uris) { - if (u.toString().equals(tokenFile)) { - path = new Path(u); - } - } - if (path == null) { - throw new IllegalArgumentException("Couldn't find password file called \"" + tokenFile + "\" in cache."); - } - FileSystem fs = FileSystem.get(conf); - in = fs.open(path); - } catch (IOException e) { - throw new IllegalArgumentException("Couldn't open password file called \"" + tokenFile + "\"."); - } - try (java.util.Scanner fileScanner = new java.util.Scanner(in)) { - while (fileScanner.hasNextLine()) { - Credentials creds = Credentials.deserialize(fileScanner.nextLine()); - if (principal.equals(creds.getPrincipal())) { - return creds.getToken(); - } - } - throw new IllegalArgumentException("Couldn't find token for user \"" + principal + "\" in file \"" + tokenFile + "\""); - } - } - - /** - * Configures a {@link ZooKeeperInstance} for this job. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @param clientConfig - * client configuration for specifying connection timeouts, SSL connection options, etc. - * @since 1.6.0 - */ - public static void setZooKeeperInstance(Class implementingClass, Configuration conf, ClientConfiguration clientConfig) { - String key = enumToConfKey(implementingClass, InstanceOpts.TYPE); - if (!conf.get(key, "").isEmpty()) - throw new IllegalStateException("Instance info can only be set once per job; it has already been configured with " + conf.get(key)); - conf.set(key, "ZooKeeperInstance"); - if (clientConfig != null) { - conf.set(enumToConfKey(implementingClass, InstanceOpts.CLIENT_CONFIG), clientConfig.serialize()); - } - } - - /** - * Configures a {@link MockInstance} for this job. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @param instanceName - * the Accumulo instance name - * @since 1.6.0 - */ - public static void setMockInstance(Class implementingClass, Configuration conf, String instanceName) { - String key = enumToConfKey(implementingClass, InstanceOpts.TYPE); - if (!conf.get(key, "").isEmpty()) - throw new IllegalStateException("Instance info can only be set once per job; it has already been configured with " + conf.get(key)); - conf.set(key, "MockInstance"); - - checkArgument(instanceName != null, "instanceName is null"); - conf.set(enumToConfKey(implementingClass, InstanceOpts.NAME), instanceName); - } - - /** - * Initializes an Accumulo {@link Instance} based on the configuration. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @return an Accumulo instance - * @since 1.6.0 - * @see #setZooKeeperInstance(Class, Configuration, ClientConfiguration) - * @see #setMockInstance(Class, Configuration, String) - */ - public static Instance getInstance(Class implementingClass, Configuration conf) { - String instanceType = conf.get(enumToConfKey(implementingClass, InstanceOpts.TYPE), ""); - if ("MockInstance".equals(instanceType)) - return new MockInstance(conf.get(enumToConfKey(implementingClass, InstanceOpts.NAME))); - else if ("ZooKeeperInstance".equals(instanceType)) { - String clientConfigString = conf.get(enumToConfKey(implementingClass, InstanceOpts.CLIENT_CONFIG)); - if (clientConfigString == null) { - String instanceName = conf.get(enumToConfKey(implementingClass, InstanceOpts.NAME)); - String zookeepers = conf.get(enumToConfKey(implementingClass, InstanceOpts.ZOO_KEEPERS)); - return new ZooKeeperInstance(ClientConfiguration.loadDefault().withInstance(instanceName).withZkHosts(zookeepers)); - } else { - return new ZooKeeperInstance(ClientConfiguration.deserialize(clientConfigString)); - } - } else if (instanceType.isEmpty()) - throw new IllegalStateException("Instance has not been configured for " + implementingClass.getSimpleName()); - else - throw new IllegalStateException("Unrecognized instance type " + instanceType); - } - - /** - * Sets the log level for this job. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @param level - * the logging level - * @since 1.6.0 - */ - public static void setLogLevel(Class implementingClass, Configuration conf, Level level) { - checkArgument(level != null, "level is null"); - Logger.getLogger(implementingClass).setLevel(level); - conf.setInt(enumToConfKey(implementingClass, GeneralOpts.LOG_LEVEL), level.toInt()); - } - - /** - * Gets the log level from this configuration. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @return the log level - * @since 1.6.0 - * @see #setLogLevel(Class, Configuration, Level) - */ - public static Level getLogLevel(Class implementingClass, Configuration conf) { - return Level.toLevel(conf.getInt(enumToConfKey(implementingClass, GeneralOpts.LOG_LEVEL), Level.INFO.toInt())); - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/99baad37/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/DistributedCacheHelper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/DistributedCacheHelper.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/DistributedCacheHelper.java deleted file mode 100644 index c694b9a..0000000 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/DistributedCacheHelper.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.core.client.mapreduce.lib.impl; - -import java.io.IOException; -import java.net.URI; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.filecache.DistributedCache; -import org.apache.hadoop.fs.Path; - -/** - * @since 1.6.0 - */ -@SuppressWarnings("deprecation") -public class DistributedCacheHelper { - - /** - * @since 1.6.0 - */ - public static void addCacheFile(URI uri, Configuration conf) { - DistributedCache.addCacheFile(uri, conf); - } - - /** - * @since 1.6.0 - */ - public static URI[] getCacheFiles(Configuration conf) throws IOException { - return DistributedCache.getCacheFiles(conf); - } - - /** - * @since 1.6.0 - */ - public static Path[] getLocalCacheFiles(Configuration conf) throws IOException { - return DistributedCache.getLocalCacheFiles(conf); - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/99baad37/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/FileOutputConfigurator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/FileOutputConfigurator.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/FileOutputConfigurator.java deleted file mode 100644 index ce84209..0000000 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/FileOutputConfigurator.java +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.core.client.mapreduce.lib.impl; - -import java.util.Arrays; -import java.util.Map.Entry; - -import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.conf.ConfigurationCopy; -import org.apache.accumulo.core.conf.Property; -import org.apache.hadoop.conf.Configuration; - -/** - * @since 1.6.0 - */ -public class FileOutputConfigurator extends ConfiguratorBase { - - /** - * Configuration keys for {@link AccumuloConfiguration}. - * - * @since 1.6.0 - */ - public static enum Opts { - ACCUMULO_PROPERTIES; - } - - /** - * The supported Accumulo properties we set in this OutputFormat, that change the behavior of the RecordWriter.
- * These properties correspond to the supported public static setter methods available to this class. - * - * @param property - * the Accumulo property to check - * @since 1.6.0 - */ - protected static Boolean isSupportedAccumuloProperty(Property property) { - switch (property) { - case TABLE_FILE_COMPRESSION_TYPE: - case TABLE_FILE_COMPRESSED_BLOCK_SIZE: - case TABLE_FILE_BLOCK_SIZE: - case TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX: - case TABLE_FILE_REPLICATION: - return true; - default: - return false; - } - } - - /** - * Helper for transforming Accumulo configuration properties into something that can be stored safely inside the Hadoop Job configuration. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @param property - * the supported Accumulo property - * @param value - * the value of the property to set - * @since 1.6.0 - */ - private static void setAccumuloProperty(Class implementingClass, Configuration conf, Property property, T value) { - if (isSupportedAccumuloProperty(property)) { - String val = String.valueOf(value); - if (property.getType().isValidFormat(val)) - conf.set(enumToConfKey(implementingClass, Opts.ACCUMULO_PROPERTIES) + "." + property.getKey(), val); - else - throw new IllegalArgumentException("Value is not appropriate for property type '" + property.getType() + "'"); - } else - throw new IllegalArgumentException("Unsupported configuration property " + property.getKey()); - } - - /** - * This helper method provides an AccumuloConfiguration object constructed from the Accumulo defaults, and overridden with Accumulo properties that have been - * stored in the Job's configuration. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @since 1.6.0 - */ - public static AccumuloConfiguration getAccumuloConfiguration(Class implementingClass, Configuration conf) { - String prefix = enumToConfKey(implementingClass, Opts.ACCUMULO_PROPERTIES) + "."; - ConfigurationCopy acuConf = new ConfigurationCopy(AccumuloConfiguration.getDefaultConfiguration()); - for (Entry entry : conf) - if (entry.getKey().startsWith(prefix)) - acuConf.set(Property.getPropertyByKey(entry.getKey().substring(prefix.length())), entry.getValue()); - return acuConf; - } - - /** - * Sets the compression type to use for data blocks. Specifying a compression may require additional libraries to be available to your Job. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @param compressionType - * one of "none", "gz", "lzo", or "snappy" - * @since 1.6.0 - */ - public static void setCompressionType(Class implementingClass, Configuration conf, String compressionType) { - if (compressionType == null || !Arrays.asList("none", "gz", "lzo", "snappy").contains(compressionType)) - throw new IllegalArgumentException("Compression type must be one of: none, gz, lzo, snappy"); - setAccumuloProperty(implementingClass, conf, Property.TABLE_FILE_COMPRESSION_TYPE, compressionType); - } - - /** - * Sets the size for data blocks within each file.
- * Data blocks are a span of key/value pairs stored in the file that are compressed and indexed as a group. - * - *

- * Making this value smaller may increase seek performance, but at the cost of increasing the size of the indexes (which can also affect seek performance). - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @param dataBlockSize - * the block size, in bytes - * @since 1.6.0 - */ - public static void setDataBlockSize(Class implementingClass, Configuration conf, long dataBlockSize) { - setAccumuloProperty(implementingClass, conf, Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE, dataBlockSize); - } - - /** - * Sets the size for file blocks in the file system; file blocks are managed, and replicated, by the underlying file system. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @param fileBlockSize - * the block size, in bytes - * @since 1.6.0 - */ - public static void setFileBlockSize(Class implementingClass, Configuration conf, long fileBlockSize) { - setAccumuloProperty(implementingClass, conf, Property.TABLE_FILE_BLOCK_SIZE, fileBlockSize); - } - - /** - * Sets the size for index blocks within each file; smaller blocks means a deeper index hierarchy within the file, while larger blocks mean a more shallow - * index hierarchy within the file. This can affect the performance of queries. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @param indexBlockSize - * the block size, in bytes - * @since 1.6.0 - */ - public static void setIndexBlockSize(Class implementingClass, Configuration conf, long indexBlockSize) { - setAccumuloProperty(implementingClass, conf, Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX, indexBlockSize); - } - - /** - * Sets the file system replication factor for the resulting file, overriding the file system default. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @param replication - * the number of replicas for produced files - * @since 1.6.0 - */ - public static void setReplication(Class implementingClass, Configuration conf, int replication) { - setAccumuloProperty(implementingClass, conf, Property.TABLE_FILE_REPLICATION, replication); - } - -}