Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 12E6211541 for ; Mon, 21 Apr 2014 22:16:24 +0000 (UTC) Received: (qmail 83890 invoked by uid 500); 21 Apr 2014 22:16:16 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 83725 invoked by uid 500); 21 Apr 2014 22:16:13 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 83542 invoked by uid 99); 21 Apr 2014 22:16:10 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 21 Apr 2014 22:16:10 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 5736798EFCA; Mon, 21 Apr 2014 22:16:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ctubbsii@apache.org To: commits@accumulo.apache.org Date: Mon, 21 Apr 2014 22:16:13 -0000 Message-Id: In-Reply-To: <6dac1281f6aa43cfa9bba5f68bf7a72c@git.apache.org> References: <6dac1281f6aa43cfa9bba5f68bf7a72c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [04/11] Revert "ACCUMULO-1880 create mapreduce module" http://git-wip-us.apache.org/repos/asf/accumulo/blob/a8577a1c/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java b/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java deleted file mode 100644 index af9bbae..0000000 --- a/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java +++ /dev/null @@ -1,545 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.core.client.mapreduce; - -import java.io.IOException; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map.Entry; -import java.util.Set; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.BatchWriterConfig; -import org.apache.accumulo.core.client.ClientConfiguration; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.MultiTableBatchWriter; -import org.apache.accumulo.core.client.MutationsRejectedException; -import org.apache.accumulo.core.client.TableExistsException; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.ZooKeeperInstance; -import org.apache.accumulo.core.client.mapreduce.lib.impl.OutputConfigurator; -import org.apache.accumulo.core.client.mock.MockInstance; -import org.apache.accumulo.core.client.security.SecurityErrorCode; -import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; -import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.AuthenticationTokenSerializer; -import org.apache.accumulo.core.data.ColumnUpdate; -import org.apache.accumulo.core.data.KeyExtent; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.security.ColumnVisibility; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; - -/** - * This class allows MapReduce jobs to use Accumulo as the sink for data. This {@link OutputFormat} accepts keys and values of type {@link Text} (for a table - * name) and {@link Mutation} from the Map and Reduce functions. - * - * The user must specify the following via static configurator methods: - * - *
    - *
  • {@link AccumuloOutputFormat#setConnectorInfo(Job, String, AuthenticationToken)} - *
  • {@link AccumuloOutputFormat#setConnectorInfo(Job, String, String)} - *
  • {@link AccumuloOutputFormat#setZooKeeperInstance(Job, ClientConfiguration)} OR {@link AccumuloOutputFormat#setMockInstance(Job, String)} - *
- * - * Other static methods are optional. - */ -public class AccumuloOutputFormat extends OutputFormat { - - private static final Class CLASS = AccumuloOutputFormat.class; - protected static final Logger log = Logger.getLogger(CLASS); - - /** - * Sets the connector information needed to communicate with Accumulo in this job. - * - *

- * WARNING: The serialized token is stored in the configuration and shared with all MapReduce tasks. It is BASE64 encoded to provide a charset safe - * conversion to a string, and is not intended to be secure. - * - * @param job - * the Hadoop job instance to be configured - * @param principal - * a valid Accumulo user name (user must have Table.CREATE permission if {@link #setCreateTables(Job, boolean)} is set to true) - * @param token - * the user's password - * @since 1.5.0 - */ - public static void setConnectorInfo(Job job, String principal, AuthenticationToken token) throws AccumuloSecurityException { - OutputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), principal, token); - } - - /** - * Sets the connector information needed to communicate with Accumulo in this job. - * - *

- * Stores the password in a file in HDFS and pulls that into the Distributed Cache in an attempt to be more secure than storing it in the Configuration. - * - * @param job - * the Hadoop job instance to be configured - * @param principal - * a valid Accumulo user name (user must have Table.CREATE permission if {@link #setCreateTables(Job, boolean)} is set to true) - * @param tokenFile - * the path to the token file - * @since 1.6.0 - */ - public static void setConnectorInfo(Job job, String principal, String tokenFile) throws AccumuloSecurityException { - OutputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), principal, tokenFile); - } - - /** - * Determines if the connector has been configured. - * - * @param context - * the Hadoop context for the configured job - * @return true if the connector has been configured, false otherwise - * @since 1.5.0 - * @see #setConnectorInfo(Job, String, AuthenticationToken) - */ - protected static Boolean isConnectorInfoSet(JobContext context) { - return OutputConfigurator.isConnectorInfoSet(CLASS, InputFormatBase.getConfiguration(context)); - } - - /** - * Gets the user name from the configuration. - * - * @param context - * the Hadoop context for the configured job - * @return the user name - * @since 1.5.0 - * @see #setConnectorInfo(Job, String, AuthenticationToken) - */ - protected static String getPrincipal(JobContext context) { - return OutputConfigurator.getPrincipal(CLASS, InputFormatBase.getConfiguration(context)); - } - - /** - * Gets the serialized token class from either the configuration or the token file. - * - * @since 1.5.0 - * @deprecated since 1.6.0; Use {@link #getAuthenticationToken(JobContext)} instead. - */ - @Deprecated - protected static String getTokenClass(JobContext context) { - return getAuthenticationToken(context).getClass().getName(); - } - - /** - * Gets the serialized token from either the configuration or the token file. - * - * @since 1.5.0 - * @deprecated since 1.6.0; Use {@link #getAuthenticationToken(JobContext)} instead. - */ - @Deprecated - protected static byte[] getToken(JobContext context) { - return AuthenticationTokenSerializer.serialize(getAuthenticationToken(context)); - } - - /** - * Gets the authenticated token from either the specified token file or directly from the configuration, whichever was used when the job was configured. - * - * @param context - * the Hadoop context for the configured job - * @return the principal's authentication token - * @since 1.6.0 - * @see #setConnectorInfo(Job, String, AuthenticationToken) - * @see #setConnectorInfo(Job, String, String) - */ - protected static AuthenticationToken getAuthenticationToken(JobContext context) { - return OutputConfigurator.getAuthenticationToken(CLASS, InputFormatBase.getConfiguration(context)); - } - - /** - * Configures a {@link ZooKeeperInstance} for this job. - * - * @param job - * the Hadoop job instance to be configured - * @param clientConfig - * client configuration for specifying connection timeouts, SSL connection options, etc. - * @since 1.6.0 - */ - public static void setZooKeeperInstance(Job job, ClientConfiguration clientConfig) { - OutputConfigurator.setZooKeeperInstance(CLASS, job.getConfiguration(), clientConfig); - } - - /** - * Configures a {@link MockInstance} for this job. - * - * @param job - * the Hadoop job instance to be configured - * @param instanceName - * the Accumulo instance name - * @since 1.5.0 - */ - public static void setMockInstance(Job job, String instanceName) { - OutputConfigurator.setMockInstance(CLASS, job.getConfiguration(), instanceName); - } - - /** - * Initializes an Accumulo {@link Instance} based on the configuration. - * - * @param context - * the Hadoop context for the configured job - * @return an Accumulo instance - * @since 1.5.0 - * @see #setZooKeeperInstance(Job, ClientConfiguration) - * @see #setMockInstance(Job, String) - */ - protected static Instance getInstance(JobContext context) { - return OutputConfigurator.getInstance(CLASS, InputFormatBase.getConfiguration(context)); - } - - /** - * Sets the log level for this job. - * - * @param job - * the Hadoop job instance to be configured - * @param level - * the logging level - * @since 1.5.0 - */ - public static void setLogLevel(Job job, Level level) { - OutputConfigurator.setLogLevel(CLASS, job.getConfiguration(), level); - } - - /** - * Gets the log level from this configuration. - * - * @param context - * the Hadoop context for the configured job - * @return the log level - * @since 1.5.0 - * @see #setLogLevel(Job, Level) - */ - protected static Level getLogLevel(JobContext context) { - return OutputConfigurator.getLogLevel(CLASS, InputFormatBase.getConfiguration(context)); - } - - /** - * Sets the default table name to use if one emits a null in place of a table name for a given mutation. Table names can only be alpha-numeric and - * underscores. - * - * @param job - * the Hadoop job instance to be configured - * @param tableName - * the table to use when the tablename is null in the write call - * @since 1.5.0 - */ - public static void setDefaultTableName(Job job, String tableName) { - OutputConfigurator.setDefaultTableName(CLASS, job.getConfiguration(), tableName); - } - - /** - * Gets the default table name from the configuration. - * - * @param context - * the Hadoop context for the configured job - * @return the default table name - * @since 1.5.0 - * @see #setDefaultTableName(Job, String) - */ - protected static String getDefaultTableName(JobContext context) { - return OutputConfigurator.getDefaultTableName(CLASS, InputFormatBase.getConfiguration(context)); - } - - /** - * Sets the configuration for for the job's {@link BatchWriter} instances. If not set, a new {@link BatchWriterConfig}, with sensible built-in defaults is - * used. Setting the configuration multiple times overwrites any previous configuration. - * - * @param job - * the Hadoop job instance to be configured - * @param bwConfig - * the configuration for the {@link BatchWriter} - * @since 1.5.0 - */ - public static void setBatchWriterOptions(Job job, BatchWriterConfig bwConfig) { - OutputConfigurator.setBatchWriterOptions(CLASS, job.getConfiguration(), bwConfig); - } - - /** - * Gets the {@link BatchWriterConfig} settings. - * - * @param context - * the Hadoop context for the configured job - * @return the configuration object - * @since 1.5.0 - * @see #setBatchWriterOptions(Job, BatchWriterConfig) - */ - protected static BatchWriterConfig getBatchWriterOptions(JobContext context) { - return OutputConfigurator.getBatchWriterOptions(CLASS, InputFormatBase.getConfiguration(context)); - } - - /** - * Sets the directive to create new tables, as necessary. Table names can only be alpha-numeric and underscores. - * - *

- * By default, this feature is disabled. - * - * @param job - * the Hadoop job instance to be configured - * @param enableFeature - * the feature is enabled if true, disabled otherwise - * @since 1.5.0 - */ - public static void setCreateTables(Job job, boolean enableFeature) { - OutputConfigurator.setCreateTables(CLASS, job.getConfiguration(), enableFeature); - } - - /** - * Determines whether tables are permitted to be created as needed. - * - * @param context - * the Hadoop context for the configured job - * @return true if the feature is disabled, false otherwise - * @since 1.5.0 - * @see #setCreateTables(Job, boolean) - */ - protected static Boolean canCreateTables(JobContext context) { - return OutputConfigurator.canCreateTables(CLASS, InputFormatBase.getConfiguration(context)); - } - - /** - * Sets the directive to use simulation mode for this job. In simulation mode, no output is produced. This is useful for testing. - * - *

- * By default, this feature is disabled. - * - * @param job - * the Hadoop job instance to be configured - * @param enableFeature - * the feature is enabled if true, disabled otherwise - * @since 1.5.0 - */ - public static void setSimulationMode(Job job, boolean enableFeature) { - OutputConfigurator.setSimulationMode(CLASS, job.getConfiguration(), enableFeature); - } - - /** - * Determines whether this feature is enabled. - * - * @param context - * the Hadoop context for the configured job - * @return true if the feature is enabled, false otherwise - * @since 1.5.0 - * @see #setSimulationMode(Job, boolean) - */ - protected static Boolean getSimulationMode(JobContext context) { - return OutputConfigurator.getSimulationMode(CLASS, InputFormatBase.getConfiguration(context)); - } - - /** - * A base class to be used to create {@link RecordWriter} instances that write to Accumulo. - */ - protected static class AccumuloRecordWriter extends RecordWriter { - private MultiTableBatchWriter mtbw = null; - private HashMap bws = null; - private Text defaultTableName = null; - - private boolean simulate = false; - private boolean createTables = false; - - private long mutCount = 0; - private long valCount = 0; - - private Connector conn; - - protected AccumuloRecordWriter(TaskAttemptContext context) throws AccumuloException, AccumuloSecurityException, IOException { - Level l = getLogLevel(context); - if (l != null) - log.setLevel(getLogLevel(context)); - this.simulate = getSimulationMode(context); - this.createTables = canCreateTables(context); - - if (simulate) - log.info("Simulating output only. No writes to tables will occur"); - - this.bws = new HashMap(); - - String tname = getDefaultTableName(context); - this.defaultTableName = (tname == null) ? null : new Text(tname); - - if (!simulate) { - this.conn = getInstance(context).getConnector(getPrincipal(context), getAuthenticationToken(context)); - mtbw = conn.createMultiTableBatchWriter(getBatchWriterOptions(context)); - } - } - - /** - * Push a mutation into a table. If table is null, the defaultTable will be used. If canCreateTable is set, the table will be created if it does not exist. - * The table name must only contain alphanumerics and underscore. - */ - @Override - public void write(Text table, Mutation mutation) throws IOException { - if (table == null || table.toString().isEmpty()) - table = this.defaultTableName; - - if (!simulate && table == null) - throw new IOException("No table or default table specified. Try simulation mode next time"); - - ++mutCount; - valCount += mutation.size(); - printMutation(table, mutation); - - if (simulate) - return; - - if (!bws.containsKey(table)) - try { - addTable(table); - } catch (Exception e) { - e.printStackTrace(); - throw new IOException(e); - } - - try { - bws.get(table).addMutation(mutation); - } catch (MutationsRejectedException e) { - throw new IOException(e); - } - } - - public void addTable(Text tableName) throws AccumuloException, AccumuloSecurityException { - if (simulate) { - log.info("Simulating adding table: " + tableName); - return; - } - - log.debug("Adding table: " + tableName); - BatchWriter bw = null; - String table = tableName.toString(); - - if (createTables && !conn.tableOperations().exists(table)) { - try { - conn.tableOperations().create(table); - } catch (AccumuloSecurityException e) { - log.error("Accumulo security violation creating " + table, e); - throw e; - } catch (TableExistsException e) { - // Shouldn't happen - } - } - - try { - bw = mtbw.getBatchWriter(table); - } catch (TableNotFoundException e) { - log.error("Accumulo table " + table + " doesn't exist and cannot be created.", e); - throw new AccumuloException(e); - } catch (AccumuloException e) { - throw e; - } catch (AccumuloSecurityException e) { - throw e; - } - - if (bw != null) - bws.put(tableName, bw); - } - - private int printMutation(Text table, Mutation m) { - if (log.isTraceEnabled()) { - log.trace(String.format("Table %s row key: %s", table, hexDump(m.getRow()))); - for (ColumnUpdate cu : m.getUpdates()) { - log.trace(String.format("Table %s column: %s:%s", table, hexDump(cu.getColumnFamily()), hexDump(cu.getColumnQualifier()))); - log.trace(String.format("Table %s security: %s", table, new ColumnVisibility(cu.getColumnVisibility()).toString())); - log.trace(String.format("Table %s value: %s", table, hexDump(cu.getValue()))); - } - } - return m.getUpdates().size(); - } - - private String hexDump(byte[] ba) { - StringBuilder sb = new StringBuilder(); - for (byte b : ba) { - if ((b > 0x20) && (b < 0x7e)) - sb.append((char) b); - else - sb.append(String.format("x%02x", b)); - } - return sb.toString(); - } - - @Override - public void close(TaskAttemptContext attempt) throws IOException, InterruptedException { - log.debug("mutations written: " + mutCount + ", values written: " + valCount); - if (simulate) - return; - - try { - mtbw.close(); - } catch (MutationsRejectedException e) { - if (e.getAuthorizationFailuresMap().size() >= 0) { - HashMap> tables = new HashMap>(); - for (Entry> ke : e.getAuthorizationFailuresMap().entrySet()) { - Set secCodes = tables.get(ke.getKey().getTableId().toString()); - if (secCodes == null) { - secCodes = new HashSet(); - tables.put(ke.getKey().getTableId().toString(), secCodes); - } - secCodes.addAll(ke.getValue()); - } - - log.error("Not authorized to write to tables : " + tables); - } - - if (e.getConstraintViolationSummaries().size() > 0) { - log.error("Constraint violations : " + e.getConstraintViolationSummaries().size()); - } - } - } - } - - @Override - public void checkOutputSpecs(JobContext job) throws IOException { - if (!isConnectorInfoSet(job)) - throw new IOException("Connector info has not been set."); - try { - // if the instance isn't configured, it will complain here - String principal = getPrincipal(job); - AuthenticationToken token = getAuthenticationToken(job); - Connector c = getInstance(job).getConnector(principal, token); - if (!c.securityOperations().authenticateUser(principal, token)) - throw new IOException("Unable to authenticate user"); - } catch (AccumuloException e) { - throw new IOException(e); - } catch (AccumuloSecurityException e) { - throw new IOException(e); - } - } - - @Override - public OutputCommitter getOutputCommitter(TaskAttemptContext context) { - return new NullOutputFormat().getOutputCommitter(context); - } - - @Override - public RecordWriter getRecordWriter(TaskAttemptContext attempt) throws IOException { - try { - return new AccumuloRecordWriter(attempt); - } catch (Exception e) { - throw new IOException(e); - } - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/a8577a1c/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java b/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java deleted file mode 100644 index 37caf15..0000000 --- a/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.core.client.mapreduce; - -import java.io.IOException; -import java.util.Map.Entry; - -import org.apache.accumulo.core.client.ClientConfiguration; -import org.apache.accumulo.core.client.RowIterator; -import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.util.PeekingIterator; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -/** - * This class allows MapReduce jobs to use Accumulo as the source of data. This {@link InputFormat} provides row names as {@link Text} as keys, and a - * corresponding {@link PeekingIterator} as a value, which in turn makes the {@link Key}/{@link Value} pairs for that row available to the Map function. - * - * The user must specify the following via static configurator methods: - * - *

    - *
  • {@link AccumuloRowInputFormat#setConnectorInfo(Job, String, AuthenticationToken)} - *
  • {@link AccumuloRowInputFormat#setInputTableName(Job, String)} - *
  • {@link AccumuloRowInputFormat#setScanAuthorizations(Job, Authorizations)} - *
  • {@link AccumuloRowInputFormat#setZooKeeperInstance(Job, ClientConfiguration)} OR {@link AccumuloRowInputFormat#setMockInstance(Job, String)} - *
- * - * Other static methods are optional. - */ -public class AccumuloRowInputFormat extends InputFormatBase>> { - @Override - public RecordReader>> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, - InterruptedException { - log.setLevel(getLogLevel(context)); - return new RecordReaderBase>>() { - RowIterator rowIterator; - - @Override - public void initialize(InputSplit inSplit, TaskAttemptContext attempt) throws IOException { - super.initialize(inSplit, attempt); - rowIterator = new RowIterator(scannerIterator); - currentK = new Text(); - currentV = null; - } - - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - if (!rowIterator.hasNext()) - return false; - currentV = new PeekingIterator>(rowIterator.next()); - numKeysRead = rowIterator.getKVCount(); - currentKey = currentV.peek().getKey(); - currentK = new Text(currentKey.getRow()); - return true; - } - }; - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/a8577a1c/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java b/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java deleted file mode 100644 index e58e350..0000000 --- a/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java +++ /dev/null @@ -1,384 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.core.client.mapreduce; - -import java.io.IOException; -import java.util.Collection; -import java.util.List; -import java.util.Set; - -import org.apache.accumulo.core.client.ClientSideIteratorScanner; -import org.apache.accumulo.core.client.IsolatedScanner; -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.impl.TabletLocator; -import org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.util.Pair; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -/** - * This abstract {@link InputFormat} class allows MapReduce jobs to use Accumulo as the source of K,V pairs. - *

- * Subclasses must implement a {@link #createRecordReader(InputSplit, TaskAttemptContext)} to provide a {@link RecordReader} for K,V. - *

- * A static base class, RecordReaderBase, is provided to retrieve Accumulo {@link Key}/{@link Value} pairs, but one must implement its - * {@link RecordReaderBase#nextKeyValue()} to transform them to the desired generic types K,V. - *

- * See {@link AccumuloInputFormat} for an example implementation. - */ -public abstract class InputFormatBase extends AbstractInputFormat { - - /** - * Gets the table name from the configuration. - * - * @param context - * the Hadoop context for the configured job - * @return the table name - * @since 1.5.0 - * @see #setInputTableName(Job, String) - */ - protected static String getInputTableName(JobContext context) { - return InputConfigurator.getInputTableName(CLASS, getConfiguration(context)); - } - - /** - * Sets the name of the input table, over which this job will scan. - * - * @param job - * the Hadoop job instance to be configured - * @param tableName - * the table to use when the tablename is null in the write call - * @since 1.5.0 - */ - public static void setInputTableName(Job job, String tableName) { - InputConfigurator.setInputTableName(CLASS, job.getConfiguration(), tableName); - } - - /** - * Sets the input ranges to scan for the single input table associated with this job. - * - * @param job - * the Hadoop job instance to be configured - * @param ranges - * the ranges that will be mapped over - * @since 1.5.0 - */ - public static void setRanges(Job job, Collection ranges) { - InputConfigurator.setRanges(CLASS, job.getConfiguration(), ranges); - } - - /** - * Gets the ranges to scan over from a job. - * - * @param context - * the Hadoop context for the configured job - * @return the ranges - * @since 1.5.0 - * @see #setRanges(Job, Collection) - */ - protected static List getRanges(JobContext context) throws IOException { - return InputConfigurator.getRanges(CLASS, getConfiguration(context)); - } - - /** - * Restricts the columns that will be mapped over for this job for the default input table. - * - * @param job - * the Hadoop job instance to be configured - * @param columnFamilyColumnQualifierPairs - * a pair of {@link Text} objects corresponding to column family and column qualifier. If the column qualifier is null, the entire column family is - * selected. An empty set is the default and is equivalent to scanning the all columns. - * @since 1.5.0 - */ - public static void fetchColumns(Job job, Collection> columnFamilyColumnQualifierPairs) { - InputConfigurator.fetchColumns(CLASS, job.getConfiguration(), columnFamilyColumnQualifierPairs); - } - - /** - * Gets the columns to be mapped over from this job. - * - * @param context - * the Hadoop context for the configured job - * @return a set of columns - * @since 1.5.0 - * @see #fetchColumns(Job, Collection) - */ - protected static Set> getFetchedColumns(JobContext context) { - return InputConfigurator.getFetchedColumns(CLASS, getConfiguration(context)); - } - - /** - * Encode an iterator on the single input table for this job. - * - * @param job - * the Hadoop job instance to be configured - * @param cfg - * the configuration of the iterator - * @since 1.5.0 - */ - public static void addIterator(Job job, IteratorSetting cfg) { - InputConfigurator.addIterator(CLASS, job.getConfiguration(), cfg); - } - - /** - * Gets a list of the iterator settings (for iterators to apply to a scanner) from this configuration. - * - * @param context - * the Hadoop context for the configured job - * @return a list of iterators - * @since 1.5.0 - * @see #addIterator(Job, IteratorSetting) - */ - protected static List getIterators(JobContext context) { - return InputConfigurator.getIterators(CLASS, getConfiguration(context)); - } - - /** - * Controls the automatic adjustment of ranges for this job. This feature merges overlapping ranges, then splits them to align with tablet boundaries. - * Disabling this feature will cause exactly one Map task to be created for each specified range. The default setting is enabled. * - * - *

- * By default, this feature is enabled. - * - * @param job - * the Hadoop job instance to be configured - * @param enableFeature - * the feature is enabled if true, disabled otherwise - * @see #setRanges(Job, Collection) - * @since 1.5.0 - */ - public static void setAutoAdjustRanges(Job job, boolean enableFeature) { - InputConfigurator.setAutoAdjustRanges(CLASS, job.getConfiguration(), enableFeature); - } - - /** - * Determines whether a configuration has auto-adjust ranges enabled. - * - * @param context - * the Hadoop context for the configured job - * @return false if the feature is disabled, true otherwise - * @since 1.5.0 - * @see #setAutoAdjustRanges(Job, boolean) - */ - protected static boolean getAutoAdjustRanges(JobContext context) { - return InputConfigurator.getAutoAdjustRanges(CLASS, getConfiguration(context)); - } - - /** - * Controls the use of the {@link IsolatedScanner} in this job. - * - *

- * By default, this feature is disabled. - * - * @param job - * the Hadoop job instance to be configured - * @param enableFeature - * the feature is enabled if true, disabled otherwise - * @since 1.5.0 - */ - public static void setScanIsolation(Job job, boolean enableFeature) { - InputConfigurator.setScanIsolation(CLASS, job.getConfiguration(), enableFeature); - } - - /** - * Determines whether a configuration has isolation enabled. - * - * @param context - * the Hadoop context for the configured job - * @return true if the feature is enabled, false otherwise - * @since 1.5.0 - * @see #setScanIsolation(Job, boolean) - */ - protected static boolean isIsolated(JobContext context) { - return InputConfigurator.isIsolated(CLASS, getConfiguration(context)); - } - - /** - * Controls the use of the {@link ClientSideIteratorScanner} in this job. Enabling this feature will cause the iterator stack to be constructed within the Map - * task, rather than within the Accumulo TServer. To use this feature, all classes needed for those iterators must be available on the classpath for the task. - * - *

- * By default, this feature is disabled. - * - * @param job - * the Hadoop job instance to be configured - * @param enableFeature - * the feature is enabled if true, disabled otherwise - * @since 1.5.0 - */ - public static void setLocalIterators(Job job, boolean enableFeature) { - InputConfigurator.setLocalIterators(CLASS, job.getConfiguration(), enableFeature); - } - - /** - * Determines whether a configuration uses local iterators. - * - * @param context - * the Hadoop context for the configured job - * @return true if the feature is enabled, false otherwise - * @since 1.5.0 - * @see #setLocalIterators(Job, boolean) - */ - protected static boolean usesLocalIterators(JobContext context) { - return InputConfigurator.usesLocalIterators(CLASS, getConfiguration(context)); - } - - /** - *

- * Enable reading offline tables. By default, this feature is disabled and only online tables are scanned. This will make the map reduce job directly read the - * table's files. If the table is not offline, then the job will fail. If the table comes online during the map reduce job, it is likely that the job will - * fail. - * - *

- * To use this option, the map reduce user will need access to read the Accumulo directory in HDFS. - * - *

- * Reading the offline table will create the scan time iterator stack in the map process. So any iterators that are configured for the table will need to be - * on the mapper's classpath. - * - *

- * One way to use this feature is to clone a table, take the clone offline, and use the clone as the input table for a map reduce job. If you plan to map - * reduce over the data many times, it may be better to the compact the table, clone it, take it offline, and use the clone for all map reduce jobs. The - * reason to do this is that compaction will reduce each tablet in the table to one file, and it is faster to read from one file. - * - *

- * There are two possible advantages to reading a tables file directly out of HDFS. First, you may see better read performance. Second, it will support - * speculative execution better. When reading an online table speculative execution can put more load on an already slow tablet server. - * - *

- * By default, this feature is disabled. - * - * @param job - * the Hadoop job instance to be configured - * @param enableFeature - * the feature is enabled if true, disabled otherwise - * @since 1.5.0 - */ - public static void setOfflineTableScan(Job job, boolean enableFeature) { - InputConfigurator.setOfflineTableScan(CLASS, job.getConfiguration(), enableFeature); - } - - /** - * Determines whether a configuration has the offline table scan feature enabled. - * - * @param context - * the Hadoop context for the configured job - * @return true if the feature is enabled, false otherwise - * @since 1.5.0 - * @see #setOfflineTableScan(Job, boolean) - */ - protected static boolean isOfflineScan(JobContext context) { - return InputConfigurator.isOfflineScan(CLASS, getConfiguration(context)); - } - - /** - * Initializes an Accumulo {@link org.apache.accumulo.core.client.impl.TabletLocator} based on the configuration. - * - * @param context - * the Hadoop context for the configured job - * @return an Accumulo tablet locator - * @throws org.apache.accumulo.core.client.TableNotFoundException - * if the table name set on the configuration doesn't exist - * @since 1.5.0 - * @deprecated since 1.6.0 - */ - @Deprecated - protected static TabletLocator getTabletLocator(JobContext context) throws TableNotFoundException { - return InputConfigurator.getTabletLocator(CLASS, getConfiguration(context), InputConfigurator.getInputTableName(CLASS, getConfiguration(context))); - } - - protected abstract static class RecordReaderBase extends AbstractRecordReader { - - /** - * Apply the configured iterators from the configuration to the scanner for the specified table name - * - * @param context - * the Hadoop context for the configured job - * @param scanner - * the scanner to configure - * @since 1.6.0 - */ - @Override - protected void setupIterators(TaskAttemptContext context, Scanner scanner, String tableName, org.apache.accumulo.core.client.mapreduce.RangeInputSplit split) { - setupIterators(context, scanner, split); - } - - /** - * Apply the configured iterators from the configuration to the scanner. - * - * @param context - * the Hadoop context for the configured job - * @param scanner - * the scanner to configure - */ - @Deprecated - protected void setupIterators(TaskAttemptContext context, Scanner scanner) { - setupIterators(context, scanner, null); - } - - /** - * Initialize a scanner over the given input split using this task attempt configuration. - */ - protected void setupIterators(TaskAttemptContext context, Scanner scanner, org.apache.accumulo.core.client.mapreduce.RangeInputSplit split) { - List iterators = null; - if (null == split) { - iterators = getIterators(context); - } else { - iterators = split.getIterators(); - if (null == iterators) { - iterators = getIterators(context); - } - } - for (IteratorSetting iterator : iterators) - scanner.addScanIterator(iterator); - } - } - - /** - * @deprecated since 1.5.2; Use {@link org.apache.accumulo.core.client.mapreduce.RangeInputSplit} instead. - * @see org.apache.accumulo.core.client.mapreduce.RangeInputSplit - */ - @Deprecated - public static class RangeInputSplit extends org.apache.accumulo.core.client.mapreduce.RangeInputSplit { - - public RangeInputSplit() { - super(); - } - - public RangeInputSplit(RangeInputSplit other) throws IOException { - super(other); - } - - protected RangeInputSplit(String table, Range range, String[] locations) { - super(table, "", range, locations); - } - - public RangeInputSplit(String table, String tableId, Range range, String[] locations) { - super(table, tableId, range, locations); - } - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/a8577a1c/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/InputTableConfig.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/InputTableConfig.java b/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/InputTableConfig.java deleted file mode 100644 index e59451e..0000000 --- a/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/InputTableConfig.java +++ /dev/null @@ -1,367 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.core.client.mapreduce; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; - -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.util.Pair; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; - -/** - * This class to holds a batch scan configuration for a table. It contains all the properties needed to specify how rows should be returned from the table. - */ -public class InputTableConfig implements Writable { - - private List iterators; - private List ranges; - private Collection> columns; - - private boolean autoAdjustRanges = true; - private boolean useLocalIterators = false; - private boolean useIsolatedScanners = false; - private boolean offlineScan = false; - - public InputTableConfig() {} - - /** - * Creates a batch scan config object out of a previously serialized batch scan config object. - * - * @param input - * the data input of the serialized batch scan config - */ - public InputTableConfig(DataInput input) throws IOException { - readFields(input); - } - - /** - * Sets the input ranges to scan for all tables associated with this job. This will be added to any per-table ranges that have been set using - * - * @param ranges - * the ranges that will be mapped over - * @since 1.6.0 - */ - public InputTableConfig setRanges(List ranges) { - this.ranges = ranges; - return this; - } - - /** - * Returns the ranges to be queried in the configuration - */ - public List getRanges() { - return ranges != null ? ranges : new ArrayList(); - } - - /** - * Restricts the columns that will be mapped over for this job for the default input table. - * - * @param columns - * a pair of {@link Text} objects corresponding to column family and column qualifier. If the column qualifier is null, the entire column family is - * selected. An empty set is the default and is equivalent to scanning the all columns. - * @since 1.6.0 - */ - public InputTableConfig fetchColumns(Collection> columns) { - this.columns = columns; - return this; - } - - /** - * Returns the columns to be fetched for this configuration - */ - public Collection> getFetchedColumns() { - return columns != null ? columns : new HashSet>(); - } - - /** - * Set iterators on to be used in the query. - * - * @param iterators - * the configurations for the iterators - * @since 1.6.0 - */ - public InputTableConfig setIterators(List iterators) { - this.iterators = iterators; - return this; - } - - /** - * Returns the iterators to be set on this configuration - */ - public List getIterators() { - return iterators != null ? iterators : new ArrayList(); - } - - /** - * Controls the automatic adjustment of ranges for this job. This feature merges overlapping ranges, then splits them to align with tablet boundaries. - * Disabling this feature will cause exactly one Map task to be created for each specified range. The default setting is enabled. * - * - *

- * By default, this feature is enabled. - * - * @param autoAdjustRanges - * the feature is enabled if true, disabled otherwise - * @see #setRanges(java.util.List) - * @since 1.6.0 - */ - public InputTableConfig setAutoAdjustRanges(boolean autoAdjustRanges) { - this.autoAdjustRanges = autoAdjustRanges; - return this; - } - - /** - * Determines whether a configuration has auto-adjust ranges enabled. - * - * @return false if the feature is disabled, true otherwise - * @since 1.6.0 - * @see #setAutoAdjustRanges(boolean) - */ - public boolean shouldAutoAdjustRanges() { - return autoAdjustRanges; - } - - /** - * Controls the use of the {@link org.apache.accumulo.core.client.ClientSideIteratorScanner} in this job. Enabling this feature will cause the iterator stack - * to be constructed within the Map task, rather than within the Accumulo TServer. To use this feature, all classes needed for those iterators must be - * available on the classpath for the task. - * - *

- * By default, this feature is disabled. - * - * @param useLocalIterators - * the feature is enabled if true, disabled otherwise - * @since 1.6.0 - */ - public InputTableConfig setUseLocalIterators(boolean useLocalIterators) { - this.useLocalIterators = useLocalIterators; - return this; - } - - /** - * Determines whether a configuration uses local iterators. - * - * @return true if the feature is enabled, false otherwise - * @since 1.6.0 - * @see #setUseLocalIterators(boolean) - */ - public boolean shouldUseLocalIterators() { - return useLocalIterators; - } - - /** - *

- * Enable reading offline tables. By default, this feature is disabled and only online tables are scanned. This will make the map reduce job directly read the - * table's files. If the table is not offline, then the job will fail. If the table comes online during the map reduce job, it is likely that the job will - * fail. - * - *

- * To use this option, the map reduce user will need access to read the Accumulo directory in HDFS. - * - *

- * Reading the offline table will create the scan time iterator stack in the map process. So any iterators that are configured for the table will need to be - * on the mapper's classpath. The accumulo-site.xml may need to be on the mapper's classpath if HDFS or the Accumulo directory in HDFS are non-standard. - * - *

- * One way to use this feature is to clone a table, take the clone offline, and use the clone as the input table for a map reduce job. If you plan to map - * reduce over the data many times, it may be better to the compact the table, clone it, take it offline, and use the clone for all map reduce jobs. The - * reason to do this is that compaction will reduce each tablet in the table to one file, and it is faster to read from one file. - * - *

- * There are two possible advantages to reading a tables file directly out of HDFS. First, you may see better read performance. Second, it will support - * speculative execution better. When reading an online table speculative execution can put more load on an already slow tablet server. - * - *

- * By default, this feature is disabled. - * - * @param offlineScan - * the feature is enabled if true, disabled otherwise - * @since 1.6.0 - */ - public InputTableConfig setOfflineScan(boolean offlineScan) { - this.offlineScan = offlineScan; - return this; - } - - /** - * Determines whether a configuration has the offline table scan feature enabled. - * - * @return true if the feature is enabled, false otherwise - * @since 1.6.0 - * @see #setOfflineScan(boolean) - */ - public boolean isOfflineScan() { - return offlineScan; - } - - /** - * Controls the use of the {@link org.apache.accumulo.core.client.IsolatedScanner} in this job. - * - *

- * By default, this feature is disabled. - * - * @param useIsolatedScanners - * the feature is enabled if true, disabled otherwise - * @since 1.6.0 - */ - public InputTableConfig setUseIsolatedScanners(boolean useIsolatedScanners) { - this.useIsolatedScanners = useIsolatedScanners; - return this; - } - - /** - * Determines whether a configuration has isolation enabled. - * - * @return true if the feature is enabled, false otherwise - * @since 1.6.0 - * @see #setUseIsolatedScanners(boolean) - */ - public boolean shouldUseIsolatedScanners() { - return useIsolatedScanners; - } - - /** - * Writes the state for the current object out to the specified {@link DataOutput} - * - * @param dataOutput - * the output for which to write the object's state - */ - @Override - public void write(DataOutput dataOutput) throws IOException { - if (iterators != null) { - dataOutput.writeInt(iterators.size()); - for (IteratorSetting setting : iterators) - setting.write(dataOutput); - } else { - dataOutput.writeInt(0); - } - if (ranges != null) { - dataOutput.writeInt(ranges.size()); - for (Range range : ranges) - range.write(dataOutput); - } else { - dataOutput.writeInt(0); - } - if (columns != null) { - dataOutput.writeInt(columns.size()); - for (Pair column : columns) { - if (column.getSecond() == null) { - dataOutput.writeInt(1); - column.getFirst().write(dataOutput); - } else { - dataOutput.writeInt(2); - column.getFirst().write(dataOutput); - column.getSecond().write(dataOutput); - } - } - } else { - dataOutput.writeInt(0); - } - dataOutput.writeBoolean(autoAdjustRanges); - dataOutput.writeBoolean(useLocalIterators); - dataOutput.writeBoolean(useIsolatedScanners); - } - - /** - * Reads the fields in the {@link DataInput} into the current object - * - * @param dataInput - * the input fields to read into the current object - */ - @Override - public void readFields(DataInput dataInput) throws IOException { - // load iterators - long iterSize = dataInput.readInt(); - if (iterSize > 0) - iterators = new ArrayList(); - for (int i = 0; i < iterSize; i++) - iterators.add(new IteratorSetting(dataInput)); - // load ranges - long rangeSize = dataInput.readInt(); - if (rangeSize > 0) - ranges = new ArrayList(); - for (int i = 0; i < rangeSize; i++) { - Range range = new Range(); - range.readFields(dataInput); - ranges.add(range); - } - // load columns - long columnSize = dataInput.readInt(); - if (columnSize > 0) - columns = new HashSet>(); - for (int i = 0; i < columnSize; i++) { - long numPairs = dataInput.readInt(); - Text colFam = new Text(); - colFam.readFields(dataInput); - if (numPairs == 1) { - columns.add(new Pair(colFam, null)); - } else if (numPairs == 2) { - Text colQual = new Text(); - colQual.readFields(dataInput); - columns.add(new Pair(colFam, colQual)); - } - } - autoAdjustRanges = dataInput.readBoolean(); - useLocalIterators = dataInput.readBoolean(); - useIsolatedScanners = dataInput.readBoolean(); - } - - @Override - public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - - InputTableConfig that = (InputTableConfig) o; - - if (autoAdjustRanges != that.autoAdjustRanges) - return false; - if (offlineScan != that.offlineScan) - return false; - if (useIsolatedScanners != that.useIsolatedScanners) - return false; - if (useLocalIterators != that.useLocalIterators) - return false; - if (columns != null ? !columns.equals(that.columns) : that.columns != null) - return false; - if (iterators != null ? !iterators.equals(that.iterators) : that.iterators != null) - return false; - if (ranges != null ? !ranges.equals(that.ranges) : that.ranges != null) - return false; - return true; - } - - @Override - public int hashCode() { - int result = 31 * (iterators != null ? iterators.hashCode() : 0); - result = 31 * result + (ranges != null ? ranges.hashCode() : 0); - result = 31 * result + (columns != null ? columns.hashCode() : 0); - result = 31 * result + (autoAdjustRanges ? 1 : 0); - result = 31 * result + (useLocalIterators ? 1 : 0); - result = 31 * result + (useIsolatedScanners ? 1 : 0); - result = 31 * result + (offlineScan ? 1 : 0); - return result; - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/a8577a1c/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java b/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java deleted file mode 100644 index 4b5a149..0000000 --- a/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java +++ /dev/null @@ -1,490 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.core.client.mapreduce; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.math.BigInteger; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import org.apache.accumulo.core.client.ClientConfiguration; -import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.ZooKeeperInstance; -import org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator; -import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase.TokenSource; -import org.apache.accumulo.core.client.mock.MockInstance; -import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; -import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.AuthenticationTokenSerializer; -import org.apache.accumulo.core.data.ByteSequence; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.PartialKey; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.util.Pair; -import org.apache.commons.codec.binary.Base64; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.log4j.Level; - -/** - * The Class RangeInputSplit. Encapsulates an Accumulo range for use in Map Reduce jobs. - */ -public class RangeInputSplit extends InputSplit implements Writable { - private Range range; - private String[] locations; - private String tableId, tableName, instanceName, zooKeepers, principal; - private TokenSource tokenSource; - private String tokenFile; - private AuthenticationToken token; - private Boolean offline, mockInstance, isolatedScan, localIterators; - private Authorizations auths; - private Set> fetchedColumns; - private List iterators; - private Level level; - - public RangeInputSplit() { - range = new Range(); - locations = new String[0]; - tableName = ""; - tableId = ""; - } - - public RangeInputSplit(RangeInputSplit split) throws IOException { - this.setRange(split.getRange()); - this.setLocations(split.getLocations()); - this.setTableName(split.getTableName()); - this.setTableId(split.getTableId()); - } - - protected RangeInputSplit(String table, String tableId, Range range, String[] locations) { - this.range = range; - setLocations(locations); - this.tableName = table; - this.tableId = tableId; - } - - public Range getRange() { - return range; - } - - private static byte[] extractBytes(ByteSequence seq, int numBytes) { - byte[] bytes = new byte[numBytes + 1]; - bytes[0] = 0; - for (int i = 0; i < numBytes; i++) { - if (i >= seq.length()) - bytes[i + 1] = 0; - else - bytes[i + 1] = seq.byteAt(i); - } - return bytes; - } - - public static float getProgress(ByteSequence start, ByteSequence end, ByteSequence position) { - int maxDepth = Math.min(Math.max(end.length(), start.length()), position.length()); - BigInteger startBI = new BigInteger(extractBytes(start, maxDepth)); - BigInteger endBI = new BigInteger(extractBytes(end, maxDepth)); - BigInteger positionBI = new BigInteger(extractBytes(position, maxDepth)); - return (float) (positionBI.subtract(startBI).doubleValue() / endBI.subtract(startBI).doubleValue()); - } - - public float getProgress(Key currentKey) { - if (currentKey == null) - return 0f; - if (range.getStartKey() != null && range.getEndKey() != null) { - if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW) != 0) { - // just look at the row progress - return getProgress(range.getStartKey().getRowData(), range.getEndKey().getRowData(), currentKey.getRowData()); - } else if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW_COLFAM) != 0) { - // just look at the column family progress - return getProgress(range.getStartKey().getColumnFamilyData(), range.getEndKey().getColumnFamilyData(), currentKey.getColumnFamilyData()); - } else if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW_COLFAM_COLQUAL) != 0) { - // just look at the column qualifier progress - return getProgress(range.getStartKey().getColumnQualifierData(), range.getEndKey().getColumnQualifierData(), currentKey.getColumnQualifierData()); - } - } - // if we can't figure it out, then claim no progress - return 0f; - } - - /** - * This implementation of length is only an estimate, it does not provide exact values. Do not have your code rely on this return value. - */ - @Override - public long getLength() throws IOException { - Text startRow = range.isInfiniteStartKey() ? new Text(new byte[] {Byte.MIN_VALUE}) : range.getStartKey().getRow(); - Text stopRow = range.isInfiniteStopKey() ? new Text(new byte[] {Byte.MAX_VALUE}) : range.getEndKey().getRow(); - int maxCommon = Math.min(7, Math.min(startRow.getLength(), stopRow.getLength())); - long diff = 0; - - byte[] start = startRow.getBytes(); - byte[] stop = stopRow.getBytes(); - for (int i = 0; i < maxCommon; ++i) { - diff |= 0xff & (start[i] ^ stop[i]); - diff <<= Byte.SIZE; - } - - if (startRow.getLength() != stopRow.getLength()) - diff |= 0xff; - - return diff + 1; - } - - @Override - public String[] getLocations() throws IOException { - return Arrays.copyOf(locations, locations.length); - } - - @Override - public void readFields(DataInput in) throws IOException { - range.readFields(in); - tableName = in.readUTF(); - tableId = in.readUTF(); - int numLocs = in.readInt(); - locations = new String[numLocs]; - for (int i = 0; i < numLocs; ++i) - locations[i] = in.readUTF(); - - if (in.readBoolean()) { - isolatedScan = in.readBoolean(); - } - - if (in.readBoolean()) { - offline = in.readBoolean(); - } - - if (in.readBoolean()) { - localIterators = in.readBoolean(); - } - - if (in.readBoolean()) { - mockInstance = in.readBoolean(); - } - - if (in.readBoolean()) { - int numColumns = in.readInt(); - List columns = new ArrayList(numColumns); - for (int i = 0; i < numColumns; i++) { - columns.add(in.readUTF()); - } - - fetchedColumns = InputConfigurator.deserializeFetchedColumns(columns); - } - - if (in.readBoolean()) { - String strAuths = in.readUTF(); - auths = new Authorizations(strAuths.getBytes(StandardCharsets.UTF_8)); - } - - if (in.readBoolean()) { - principal = in.readUTF(); - } - - if (in.readBoolean()) { - int ordinal = in.readInt(); - this.tokenSource = TokenSource.values()[ordinal]; - - switch (this.tokenSource) { - case INLINE: - String tokenClass = in.readUTF(); - byte[] base64TokenBytes = in.readUTF().getBytes(StandardCharsets.UTF_8); - byte[] tokenBytes = Base64.decodeBase64(base64TokenBytes); - - this.token = AuthenticationTokenSerializer.deserialize(tokenClass, tokenBytes); - break; - - case FILE: - this.tokenFile = in.readUTF(); - - break; - default: - throw new IOException("Cannot parse unknown TokenSource ordinal"); - } - } - - if (in.readBoolean()) { - instanceName = in.readUTF(); - } - - if (in.readBoolean()) { - zooKeepers = in.readUTF(); - } - - if (in.readBoolean()) { - level = Level.toLevel(in.readInt()); - } - } - - @Override - public void write(DataOutput out) throws IOException { - range.write(out); - out.writeUTF(tableName); - out.writeUTF(tableId); - out.writeInt(locations.length); - for (int i = 0; i < locations.length; ++i) - out.writeUTF(locations[i]); - - out.writeBoolean(null != isolatedScan); - if (null != isolatedScan) { - out.writeBoolean(isolatedScan); - } - - out.writeBoolean(null != offline); - if (null != offline) { - out.writeBoolean(offline); - } - - out.writeBoolean(null != localIterators); - if (null != localIterators) { - out.writeBoolean(localIterators); - } - - out.writeBoolean(null != mockInstance); - if (null != mockInstance) { - out.writeBoolean(mockInstance); - } - - out.writeBoolean(null != fetchedColumns); - if (null != fetchedColumns) { - String[] cols = InputConfigurator.serializeColumns(fetchedColumns); - out.writeInt(cols.length); - for (String col : cols) { - out.writeUTF(col); - } - } - - out.writeBoolean(null != auths); - if (null != auths) { - out.writeUTF(auths.serialize()); - } - - out.writeBoolean(null != principal); - if (null != principal) { - out.writeUTF(principal); - } - - out.writeBoolean(null != tokenSource); - if (null != tokenSource) { - out.writeInt(tokenSource.ordinal()); - - if (null != token && null != tokenFile) { - throw new IOException("Cannot use both inline AuthenticationToken and file-based AuthenticationToken"); - } else if (null != token) { - out.writeUTF(token.getClass().getCanonicalName()); - out.writeUTF(Base64.encodeBase64String(AuthenticationTokenSerializer.serialize(token))); - } else { - out.writeUTF(tokenFile); - } - } - - out.writeBoolean(null != instanceName); - if (null != instanceName) { - out.writeUTF(instanceName); - } - - out.writeBoolean(null != zooKeepers); - if (null != zooKeepers) { - out.writeUTF(zooKeepers); - } - - out.writeBoolean(null != level); - if (null != level) { - out.writeInt(level.toInt()); - } - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(256); - sb.append("Range: ").append(range); - sb.append(" Locations: ").append(Arrays.asList(locations)); - sb.append(" Table: ").append(tableName); - sb.append(" TableID: ").append(tableId); - sb.append(" InstanceName: ").append(instanceName); - sb.append(" zooKeepers: ").append(zooKeepers); - sb.append(" principal: ").append(principal); - sb.append(" tokenSource: ").append(tokenSource); - sb.append(" authenticationToken: ").append(token); - sb.append(" authenticationTokenFile: ").append(tokenFile); - sb.append(" Authorizations: ").append(auths); - sb.append(" offlineScan: ").append(offline); - sb.append(" mockInstance: ").append(mockInstance); - sb.append(" isolatedScan: ").append(isolatedScan); - sb.append(" localIterators: ").append(localIterators); - sb.append(" fetchColumns: ").append(fetchedColumns); - sb.append(" iterators: ").append(iterators); - sb.append(" logLevel: ").append(level); - return sb.toString(); - } - - public String getTableName() { - return tableName; - } - - public void setTableName(String table) { - this.tableName = table; - } - - public void setTableId(String tableId) { - this.tableId = tableId; - } - - public String getTableId() { - return tableId; - } - - public Instance getInstance() { - if (null == instanceName) { - return null; - } - - if (isMockInstance()) { - return new MockInstance(getInstanceName()); - } - - if (null == zooKeepers) { - return null; - } - - return new ZooKeeperInstance(ClientConfiguration.loadDefault().withInstance(getInstanceName()).withZkHosts(getZooKeepers())); - } - - public String getInstanceName() { - return instanceName; - } - - public void setInstanceName(String instanceName) { - this.instanceName = instanceName; - } - - public String getZooKeepers() { - return zooKeepers; - } - - public void setZooKeepers(String zooKeepers) { - this.zooKeepers = zooKeepers; - } - - public String getPrincipal() { - return principal; - } - - public void setPrincipal(String principal) { - this.principal = principal; - } - - public AuthenticationToken getToken() { - return token; - } - - public void setToken(AuthenticationToken token) { - this.tokenSource = TokenSource.INLINE; - this.token = token; - } - - public void setToken(String tokenFile) { - this.tokenSource = TokenSource.FILE; - this.tokenFile = tokenFile; - } - - public Boolean isOffline() { - return offline; - } - - public void setOffline(Boolean offline) { - this.offline = offline; - } - - public void setLocations(String[] locations) { - this.locations = Arrays.copyOf(locations, locations.length); - } - - public Boolean isMockInstance() { - return mockInstance; - } - - public void setMockInstance(Boolean mockInstance) { - this.mockInstance = mockInstance; - } - - public Boolean isIsolatedScan() { - return isolatedScan; - } - - public void setIsolatedScan(Boolean isolatedScan) { - this.isolatedScan = isolatedScan; - } - - public Authorizations getAuths() { - return auths; - } - - public void setAuths(Authorizations auths) { - this.auths = auths; - } - - public void setRange(Range range) { - this.range = range; - } - - public Boolean usesLocalIterators() { - return localIterators; - } - - public void setUsesLocalIterators(Boolean localIterators) { - this.localIterators = localIterators; - } - - public Set> getFetchedColumns() { - return fetchedColumns; - } - - public void setFetchedColumns(Collection> fetchedColumns) { - this.fetchedColumns = new HashSet>(); - for (Pair columns : fetchedColumns) { - this.fetchedColumns.add(columns); - } - } - - public void setFetchedColumns(Set> fetchedColumns) { - this.fetchedColumns = fetchedColumns; - } - - public List getIterators() { - return iterators; - } - - public void setIterators(List iterators) { - this.iterators = iterators; - } - - public Level getLogLevel() { - return level; - } - - public void setLogLevel(Level level) { - this.level = level; - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/a8577a1c/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/ConfiguratorBase.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/ConfiguratorBase.java b/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/ConfiguratorBase.java deleted file mode 100644 index 4610556..0000000 --- a/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/ConfiguratorBase.java +++ /dev/null @@ -1,369 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.core.client.mapreduce.lib.impl; - -import static com.google.common.base.Preconditions.checkArgument; - -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.nio.charset.StandardCharsets; - -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.ClientConfiguration; -import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.ZooKeeperInstance; -import org.apache.accumulo.core.client.mock.MockInstance; -import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; -import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.AuthenticationTokenSerializer; -import org.apache.accumulo.core.security.Credentials; -import org.apache.commons.codec.binary.Base64; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.util.StringUtils; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; - -/** - * @since 1.6.0 - */ -public class ConfiguratorBase { - - /** - * Configuration keys for {@link Instance#getConnector(String, AuthenticationToken)}. - * - * @since 1.6.0 - */ - public static enum ConnectorInfo { - IS_CONFIGURED, PRINCIPAL, TOKEN, - } - - public static enum TokenSource { - FILE, INLINE; - - private String prefix; - - private TokenSource() { - prefix = name().toLowerCase() + ":"; - } - - public String prefix() { - return prefix; - } - } - - /** - * Configuration keys for {@link Instance}, {@link ZooKeeperInstance}, and {@link MockInstance}. - * - * @since 1.6.0 - */ - public static enum InstanceOpts { - TYPE, NAME, ZOO_KEEPERS, CLIENT_CONFIG; - } - - /** - * Configuration keys for general configuration options. - * - * @since 1.6.0 - */ - public static enum GeneralOpts { - LOG_LEVEL - } - - /** - * Provides a configuration key for a given feature enum, prefixed by the implementingClass - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param e - * the enum used to provide the unique part of the configuration key - * @return the configuration key - * @since 1.6.0 - */ - protected static String enumToConfKey(Class implementingClass, Enum e) { - return implementingClass.getSimpleName() + "." + e.getDeclaringClass().getSimpleName() + "." + StringUtils.camelize(e.name().toLowerCase()); - } - - /** - * Sets the connector information needed to communicate with Accumulo in this job. - * - *

- * WARNING: The serialized token is stored in the configuration and shared with all MapReduce tasks. It is BASE64 encoded to provide a charset safe - * conversion to a string, and is not intended to be secure. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @param principal - * a valid Accumulo user name - * @param token - * the user's password - * @since 1.6.0 - */ - public static void setConnectorInfo(Class implementingClass, Configuration conf, String principal, AuthenticationToken token) - throws AccumuloSecurityException { - if (isConnectorInfoSet(implementingClass, conf)) - throw new IllegalStateException("Connector info for " + implementingClass.getSimpleName() + " can only be set once per job"); - - checkArgument(principal != null, "principal is null"); - checkArgument(token != null, "token is null"); - conf.setBoolean(enumToConfKey(implementingClass, ConnectorInfo.IS_CONFIGURED), true); - conf.set(enumToConfKey(implementingClass, ConnectorInfo.PRINCIPAL), principal); - conf.set(enumToConfKey(implementingClass, ConnectorInfo.TOKEN), - TokenSource.INLINE.prefix() + token.getClass().getName() + ":" + Base64.encodeBase64String(AuthenticationTokenSerializer.serialize(token))); - } - - /** - * Sets the connector information needed to communicate with Accumulo in this job. - * - *

- * Pulls a token file into the Distributed Cache that contains the authentication token in an attempt to be more secure than storing the password in the - * Configuration. Token file created with "bin/accumulo create-token". - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @param principal - * a valid Accumulo user name - * @param tokenFile - * the path to the token file in DFS - * @since 1.6.0 - */ - public static void setConnectorInfo(Class implementingClass, Configuration conf, String principal, String tokenFile) throws AccumuloSecurityException { - if (isConnectorInfoSet(implementingClass, conf)) - throw new IllegalStateException("Connector info for " + implementingClass.getSimpleName() + " can only be set once per job"); - - checkArgument(principal != null, "principal is null"); - checkArgument(tokenFile != null, "tokenFile is null"); - - try { - DistributedCacheHelper.addCacheFile(new URI(tokenFile), conf); - } catch (URISyntaxException e) { - throw new IllegalStateException("Unable to add tokenFile \"" + tokenFile + "\" to distributed cache."); - } - - conf.setBoolean(enumToConfKey(implementingClass, ConnectorInfo.IS_CONFIGURED), true); - conf.set(enumToConfKey(implementingClass, ConnectorInfo.PRINCIPAL), principal); - conf.set(enumToConfKey(implementingClass, ConnectorInfo.TOKEN), TokenSource.FILE.prefix() + tokenFile); - } - - /** - * Determines if the connector info has already been set for this instance. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @return true if the connector info has already been set, false otherwise - * @since 1.6.0 - * @see #setConnectorInfo(Class, Configuration, String, AuthenticationToken) - */ - public static Boolean isConnectorInfoSet(Class implementingClass, Configuration conf) { - return conf.getBoolean(enumToConfKey(implementingClass, ConnectorInfo.IS_CONFIGURED), false); - } - - /** - * Gets the user name from the configuration. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @return the principal - * @since 1.6.0 - * @see #setConnectorInfo(Class, Configuration, String, AuthenticationToken) - */ - public static String getPrincipal(Class implementingClass, Configuration conf) { - return conf.get(enumToConfKey(implementingClass, ConnectorInfo.PRINCIPAL)); - } - - /** - * Gets the authenticated token from either the specified token file or directly from the configuration, whichever was used when the job was configured. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @return the principal's authentication token - * @since 1.6.0 - * @see #setConnectorInfo(Class, Configuration, String, AuthenticationToken) - * @see #setConnectorInfo(Class, Configuration, String, String) - */ - public static AuthenticationToken getAuthenticationToken(Class implementingClass, Configuration conf) { - String token = conf.get(enumToConfKey(implementingClass, ConnectorInfo.TOKEN)); - if (token == null || token.isEmpty()) - return null; - if (token.startsWith(TokenSource.INLINE.prefix())) { - String[] args = token.substring(TokenSource.INLINE.prefix().length()).split(":", 2); - if (args.length == 2) - return AuthenticationTokenSerializer.deserialize(args[0], Base64.decodeBase64(args[1].getBytes(StandardCharsets.UTF_8))); - } else if (token.startsWith(TokenSource.FILE.prefix())) { - String tokenFileName = token.substring(TokenSource.FILE.prefix().length()); - return getTokenFromFile(conf, getPrincipal(implementingClass, conf), tokenFileName); - } - - throw new IllegalStateException("Token was not properly serialized into the configuration"); - } - - /** - * Reads from the token file in distributed cache. Currently, the token file stores data separated by colons e.g. principal:token_class:token - * - * @param conf - * the Hadoop context for the configured job - * @return path to the token file as a String - * @since 1.6.0 - * @see #setConnectorInfo(Class, Configuration, String, AuthenticationToken) - */ - public static AuthenticationToken getTokenFromFile(Configuration conf, String principal, String tokenFile) { - FSDataInputStream in = null; - try { - URI[] uris = DistributedCacheHelper.getCacheFiles(conf); - Path path = null; - for (URI u : uris) { - if (u.toString().equals(tokenFile)) { - path = new Path(u); - } - } - if (path == null) { - throw new IllegalArgumentException("Couldn't find password file called \"" + tokenFile + "\" in cache."); - } - FileSystem fs = FileSystem.get(conf); - in = fs.open(path); - } catch (IOException e) { - throw new IllegalArgumentException("Couldn't open password file called \"" + tokenFile + "\"."); - } - try (java.util.Scanner fileScanner = new java.util.Scanner(in)) { - while (fileScanner.hasNextLine()) { - Credentials creds = Credentials.deserialize(fileScanner.nextLine()); - if (principal.equals(creds.getPrincipal())) { - return creds.getToken(); - } - } - throw new IllegalArgumentException("Couldn't find token for user \"" + principal + "\" in file \"" + tokenFile + "\""); - } - } - - /** - * Configures a {@link ZooKeeperInstance} for this job. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @param clientConfig - * client configuration for specifying connection timeouts, SSL connection options, etc. - * @since 1.6.0 - */ - public static void setZooKeeperInstance(Class implementingClass, Configuration conf, ClientConfiguration clientConfig) { - String key = enumToConfKey(implementingClass, InstanceOpts.TYPE); - if (!conf.get(key, "").isEmpty()) - throw new IllegalStateException("Instance info can only be set once per job; it has already been configured with " + conf.get(key)); - conf.set(key, "ZooKeeperInstance"); - if (clientConfig != null) { - conf.set(enumToConfKey(implementingClass, InstanceOpts.CLIENT_CONFIG), clientConfig.serialize()); - } - } - - /** - * Configures a {@link MockInstance} for this job. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @param instanceName - * the Accumulo instance name - * @since 1.6.0 - */ - public static void setMockInstance(Class implementingClass, Configuration conf, String instanceName) { - String key = enumToConfKey(implementingClass, InstanceOpts.TYPE); - if (!conf.get(key, "").isEmpty()) - throw new IllegalStateException("Instance info can only be set once per job; it has already been configured with " + conf.get(key)); - conf.set(key, "MockInstance"); - - checkArgument(instanceName != null, "instanceName is null"); - conf.set(enumToConfKey(implementingClass, InstanceOpts.NAME), instanceName); - } - - /** - * Initializes an Accumulo {@link Instance} based on the configuration. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @return an Accumulo instance - * @since 1.6.0 - * @see #setZooKeeperInstance(Class, Configuration, ClientConfiguration) - * @see #setMockInstance(Class, Configuration, String) - */ - public static Instance getInstance(Class implementingClass, Configuration conf) { - String instanceType = conf.get(enumToConfKey(implementingClass, InstanceOpts.TYPE), ""); - if ("MockInstance".equals(instanceType)) - return new MockInstance(conf.get(enumToConfKey(implementingClass, InstanceOpts.NAME))); - else if ("ZooKeeperInstance".equals(instanceType)) { - String clientConfigString = conf.get(enumToConfKey(implementingClass, InstanceOpts.CLIENT_CONFIG)); - if (clientConfigString == null) { - String instanceName = conf.get(enumToConfKey(implementingClass, InstanceOpts.NAME)); - String zookeepers = conf.get(enumToConfKey(implementingClass, InstanceOpts.ZOO_KEEPERS)); - return new ZooKeeperInstance(ClientConfiguration.loadDefault().withInstance(instanceName).withZkHosts(zookeepers)); - } else { - return new ZooKeeperInstance(ClientConfiguration.deserialize(clientConfigString)); - } - } else if (instanceType.isEmpty()) - throw new IllegalStateException("Instance has not been configured for " + implementingClass.getSimpleName()); - else - throw new IllegalStateException("Unrecognized instance type " + instanceType); - } - - /** - * Sets the log level for this job. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @param level - * the logging level - * @since 1.6.0 - */ - public static void setLogLevel(Class implementingClass, Configuration conf, Level level) { - checkArgument(level != null, "level is null"); - Logger.getLogger(implementingClass).setLevel(level); - conf.setInt(enumToConfKey(implementingClass, GeneralOpts.LOG_LEVEL), level.toInt()); - } - - /** - * Gets the log level from this configuration. - * - * @param implementingClass - * the class whose name will be used as a prefix for the property configuration key - * @param conf - * the Hadoop configuration object to configure - * @return the log level - * @since 1.6.0 - * @see #setLogLevel(Class, Configuration, Level) - */ - public static Level getLogLevel(Class implementingClass, Configuration conf) { - return Level.toLevel(conf.getInt(enumToConfKey(implementingClass, GeneralOpts.LOG_LEVEL), Level.INFO.toInt())); - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/a8577a1c/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/DistributedCacheHelper.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/DistributedCacheHelper.java b/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/DistributedCacheHelper.java deleted file mode 100644 index c694b9a..0000000 --- a/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/DistributedCacheHelper.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.core.client.mapreduce.lib.impl; - -import java.io.IOException; -import java.net.URI; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.filecache.DistributedCache; -import org.apache.hadoop.fs.Path; - -/** - * @since 1.6.0 - */ -@SuppressWarnings("deprecation") -public class DistributedCacheHelper { - - /** - * @since 1.6.0 - */ - public static void addCacheFile(URI uri, Configuration conf) { - DistributedCache.addCacheFile(uri, conf); - } - - /** - * @since 1.6.0 - */ - public static URI[] getCacheFiles(Configuration conf) throws IOException { - return DistributedCache.getCacheFiles(conf); - } - - /** - * @since 1.6.0 - */ - public static Path[] getLocalCacheFiles(Configuration conf) throws IOException { - return DistributedCache.getLocalCacheFiles(conf); - } -}