Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 798CE117B6 for ; Mon, 21 Apr 2014 23:48:18 +0000 (UTC) Received: (qmail 14161 invoked by uid 500); 21 Apr 2014 23:48:09 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 13994 invoked by uid 500); 21 Apr 2014 23:48:06 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 13865 invoked by uid 99); 21 Apr 2014 23:48:04 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 21 Apr 2014 23:48:04 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id B018783905B; Mon, 21 Apr 2014 23:48:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mdrob@apache.org To: commits@accumulo.apache.org Date: Mon, 21 Apr 2014 23:48:06 -0000 Message-Id: <4b3142ebae94445492f3ee0694721e4d@git.apache.org> In-Reply-To: <240dd757dcce408aa794d95887da7cc2@git.apache.org> References: <240dd757dcce408aa794d95887da7cc2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [04/11] ACCUMULO-1880 create mapreduce module http://git-wip-us.apache.org/repos/asf/accumulo/blob/99baad37/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java b/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java new file mode 100644 index 0000000..af9bbae --- /dev/null +++ b/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java @@ -0,0 +1,545 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client.mapreduce; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.MultiTableBatchWriter; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.mapreduce.lib.impl.OutputConfigurator; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.SecurityErrorCode; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.AuthenticationTokenSerializer; +import org.apache.accumulo.core.data.ColumnUpdate; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +/** + * This class allows MapReduce jobs to use Accumulo as the sink for data. This {@link OutputFormat} accepts keys and values of type {@link Text} (for a table + * name) and {@link Mutation} from the Map and Reduce functions. + * + * The user must specify the following via static configurator methods: + * + *
    + *
  • {@link AccumuloOutputFormat#setConnectorInfo(Job, String, AuthenticationToken)} + *
  • {@link AccumuloOutputFormat#setConnectorInfo(Job, String, String)} + *
  • {@link AccumuloOutputFormat#setZooKeeperInstance(Job, ClientConfiguration)} OR {@link AccumuloOutputFormat#setMockInstance(Job, String)} + *
+ * + * Other static methods are optional. + */ +public class AccumuloOutputFormat extends OutputFormat { + + private static final Class CLASS = AccumuloOutputFormat.class; + protected static final Logger log = Logger.getLogger(CLASS); + + /** + * Sets the connector information needed to communicate with Accumulo in this job. + * + *

+ * WARNING: The serialized token is stored in the configuration and shared with all MapReduce tasks. It is BASE64 encoded to provide a charset safe + * conversion to a string, and is not intended to be secure. + * + * @param job + * the Hadoop job instance to be configured + * @param principal + * a valid Accumulo user name (user must have Table.CREATE permission if {@link #setCreateTables(Job, boolean)} is set to true) + * @param token + * the user's password + * @since 1.5.0 + */ + public static void setConnectorInfo(Job job, String principal, AuthenticationToken token) throws AccumuloSecurityException { + OutputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), principal, token); + } + + /** + * Sets the connector information needed to communicate with Accumulo in this job. + * + *

+ * Stores the password in a file in HDFS and pulls that into the Distributed Cache in an attempt to be more secure than storing it in the Configuration. + * + * @param job + * the Hadoop job instance to be configured + * @param principal + * a valid Accumulo user name (user must have Table.CREATE permission if {@link #setCreateTables(Job, boolean)} is set to true) + * @param tokenFile + * the path to the token file + * @since 1.6.0 + */ + public static void setConnectorInfo(Job job, String principal, String tokenFile) throws AccumuloSecurityException { + OutputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), principal, tokenFile); + } + + /** + * Determines if the connector has been configured. + * + * @param context + * the Hadoop context for the configured job + * @return true if the connector has been configured, false otherwise + * @since 1.5.0 + * @see #setConnectorInfo(Job, String, AuthenticationToken) + */ + protected static Boolean isConnectorInfoSet(JobContext context) { + return OutputConfigurator.isConnectorInfoSet(CLASS, InputFormatBase.getConfiguration(context)); + } + + /** + * Gets the user name from the configuration. + * + * @param context + * the Hadoop context for the configured job + * @return the user name + * @since 1.5.0 + * @see #setConnectorInfo(Job, String, AuthenticationToken) + */ + protected static String getPrincipal(JobContext context) { + return OutputConfigurator.getPrincipal(CLASS, InputFormatBase.getConfiguration(context)); + } + + /** + * Gets the serialized token class from either the configuration or the token file. + * + * @since 1.5.0 + * @deprecated since 1.6.0; Use {@link #getAuthenticationToken(JobContext)} instead. + */ + @Deprecated + protected static String getTokenClass(JobContext context) { + return getAuthenticationToken(context).getClass().getName(); + } + + /** + * Gets the serialized token from either the configuration or the token file. + * + * @since 1.5.0 + * @deprecated since 1.6.0; Use {@link #getAuthenticationToken(JobContext)} instead. + */ + @Deprecated + protected static byte[] getToken(JobContext context) { + return AuthenticationTokenSerializer.serialize(getAuthenticationToken(context)); + } + + /** + * Gets the authenticated token from either the specified token file or directly from the configuration, whichever was used when the job was configured. + * + * @param context + * the Hadoop context for the configured job + * @return the principal's authentication token + * @since 1.6.0 + * @see #setConnectorInfo(Job, String, AuthenticationToken) + * @see #setConnectorInfo(Job, String, String) + */ + protected static AuthenticationToken getAuthenticationToken(JobContext context) { + return OutputConfigurator.getAuthenticationToken(CLASS, InputFormatBase.getConfiguration(context)); + } + + /** + * Configures a {@link ZooKeeperInstance} for this job. + * + * @param job + * the Hadoop job instance to be configured + * @param clientConfig + * client configuration for specifying connection timeouts, SSL connection options, etc. + * @since 1.6.0 + */ + public static void setZooKeeperInstance(Job job, ClientConfiguration clientConfig) { + OutputConfigurator.setZooKeeperInstance(CLASS, job.getConfiguration(), clientConfig); + } + + /** + * Configures a {@link MockInstance} for this job. + * + * @param job + * the Hadoop job instance to be configured + * @param instanceName + * the Accumulo instance name + * @since 1.5.0 + */ + public static void setMockInstance(Job job, String instanceName) { + OutputConfigurator.setMockInstance(CLASS, job.getConfiguration(), instanceName); + } + + /** + * Initializes an Accumulo {@link Instance} based on the configuration. + * + * @param context + * the Hadoop context for the configured job + * @return an Accumulo instance + * @since 1.5.0 + * @see #setZooKeeperInstance(Job, ClientConfiguration) + * @see #setMockInstance(Job, String) + */ + protected static Instance getInstance(JobContext context) { + return OutputConfigurator.getInstance(CLASS, InputFormatBase.getConfiguration(context)); + } + + /** + * Sets the log level for this job. + * + * @param job + * the Hadoop job instance to be configured + * @param level + * the logging level + * @since 1.5.0 + */ + public static void setLogLevel(Job job, Level level) { + OutputConfigurator.setLogLevel(CLASS, job.getConfiguration(), level); + } + + /** + * Gets the log level from this configuration. + * + * @param context + * the Hadoop context for the configured job + * @return the log level + * @since 1.5.0 + * @see #setLogLevel(Job, Level) + */ + protected static Level getLogLevel(JobContext context) { + return OutputConfigurator.getLogLevel(CLASS, InputFormatBase.getConfiguration(context)); + } + + /** + * Sets the default table name to use if one emits a null in place of a table name for a given mutation. Table names can only be alpha-numeric and + * underscores. + * + * @param job + * the Hadoop job instance to be configured + * @param tableName + * the table to use when the tablename is null in the write call + * @since 1.5.0 + */ + public static void setDefaultTableName(Job job, String tableName) { + OutputConfigurator.setDefaultTableName(CLASS, job.getConfiguration(), tableName); + } + + /** + * Gets the default table name from the configuration. + * + * @param context + * the Hadoop context for the configured job + * @return the default table name + * @since 1.5.0 + * @see #setDefaultTableName(Job, String) + */ + protected static String getDefaultTableName(JobContext context) { + return OutputConfigurator.getDefaultTableName(CLASS, InputFormatBase.getConfiguration(context)); + } + + /** + * Sets the configuration for for the job's {@link BatchWriter} instances. If not set, a new {@link BatchWriterConfig}, with sensible built-in defaults is + * used. Setting the configuration multiple times overwrites any previous configuration. + * + * @param job + * the Hadoop job instance to be configured + * @param bwConfig + * the configuration for the {@link BatchWriter} + * @since 1.5.0 + */ + public static void setBatchWriterOptions(Job job, BatchWriterConfig bwConfig) { + OutputConfigurator.setBatchWriterOptions(CLASS, job.getConfiguration(), bwConfig); + } + + /** + * Gets the {@link BatchWriterConfig} settings. + * + * @param context + * the Hadoop context for the configured job + * @return the configuration object + * @since 1.5.0 + * @see #setBatchWriterOptions(Job, BatchWriterConfig) + */ + protected static BatchWriterConfig getBatchWriterOptions(JobContext context) { + return OutputConfigurator.getBatchWriterOptions(CLASS, InputFormatBase.getConfiguration(context)); + } + + /** + * Sets the directive to create new tables, as necessary. Table names can only be alpha-numeric and underscores. + * + *

+ * By default, this feature is disabled. + * + * @param job + * the Hadoop job instance to be configured + * @param enableFeature + * the feature is enabled if true, disabled otherwise + * @since 1.5.0 + */ + public static void setCreateTables(Job job, boolean enableFeature) { + OutputConfigurator.setCreateTables(CLASS, job.getConfiguration(), enableFeature); + } + + /** + * Determines whether tables are permitted to be created as needed. + * + * @param context + * the Hadoop context for the configured job + * @return true if the feature is disabled, false otherwise + * @since 1.5.0 + * @see #setCreateTables(Job, boolean) + */ + protected static Boolean canCreateTables(JobContext context) { + return OutputConfigurator.canCreateTables(CLASS, InputFormatBase.getConfiguration(context)); + } + + /** + * Sets the directive to use simulation mode for this job. In simulation mode, no output is produced. This is useful for testing. + * + *

+ * By default, this feature is disabled. + * + * @param job + * the Hadoop job instance to be configured + * @param enableFeature + * the feature is enabled if true, disabled otherwise + * @since 1.5.0 + */ + public static void setSimulationMode(Job job, boolean enableFeature) { + OutputConfigurator.setSimulationMode(CLASS, job.getConfiguration(), enableFeature); + } + + /** + * Determines whether this feature is enabled. + * + * @param context + * the Hadoop context for the configured job + * @return true if the feature is enabled, false otherwise + * @since 1.5.0 + * @see #setSimulationMode(Job, boolean) + */ + protected static Boolean getSimulationMode(JobContext context) { + return OutputConfigurator.getSimulationMode(CLASS, InputFormatBase.getConfiguration(context)); + } + + /** + * A base class to be used to create {@link RecordWriter} instances that write to Accumulo. + */ + protected static class AccumuloRecordWriter extends RecordWriter { + private MultiTableBatchWriter mtbw = null; + private HashMap bws = null; + private Text defaultTableName = null; + + private boolean simulate = false; + private boolean createTables = false; + + private long mutCount = 0; + private long valCount = 0; + + private Connector conn; + + protected AccumuloRecordWriter(TaskAttemptContext context) throws AccumuloException, AccumuloSecurityException, IOException { + Level l = getLogLevel(context); + if (l != null) + log.setLevel(getLogLevel(context)); + this.simulate = getSimulationMode(context); + this.createTables = canCreateTables(context); + + if (simulate) + log.info("Simulating output only. No writes to tables will occur"); + + this.bws = new HashMap(); + + String tname = getDefaultTableName(context); + this.defaultTableName = (tname == null) ? null : new Text(tname); + + if (!simulate) { + this.conn = getInstance(context).getConnector(getPrincipal(context), getAuthenticationToken(context)); + mtbw = conn.createMultiTableBatchWriter(getBatchWriterOptions(context)); + } + } + + /** + * Push a mutation into a table. If table is null, the defaultTable will be used. If canCreateTable is set, the table will be created if it does not exist. + * The table name must only contain alphanumerics and underscore. + */ + @Override + public void write(Text table, Mutation mutation) throws IOException { + if (table == null || table.toString().isEmpty()) + table = this.defaultTableName; + + if (!simulate && table == null) + throw new IOException("No table or default table specified. Try simulation mode next time"); + + ++mutCount; + valCount += mutation.size(); + printMutation(table, mutation); + + if (simulate) + return; + + if (!bws.containsKey(table)) + try { + addTable(table); + } catch (Exception e) { + e.printStackTrace(); + throw new IOException(e); + } + + try { + bws.get(table).addMutation(mutation); + } catch (MutationsRejectedException e) { + throw new IOException(e); + } + } + + public void addTable(Text tableName) throws AccumuloException, AccumuloSecurityException { + if (simulate) { + log.info("Simulating adding table: " + tableName); + return; + } + + log.debug("Adding table: " + tableName); + BatchWriter bw = null; + String table = tableName.toString(); + + if (createTables && !conn.tableOperations().exists(table)) { + try { + conn.tableOperations().create(table); + } catch (AccumuloSecurityException e) { + log.error("Accumulo security violation creating " + table, e); + throw e; + } catch (TableExistsException e) { + // Shouldn't happen + } + } + + try { + bw = mtbw.getBatchWriter(table); + } catch (TableNotFoundException e) { + log.error("Accumulo table " + table + " doesn't exist and cannot be created.", e); + throw new AccumuloException(e); + } catch (AccumuloException e) { + throw e; + } catch (AccumuloSecurityException e) { + throw e; + } + + if (bw != null) + bws.put(tableName, bw); + } + + private int printMutation(Text table, Mutation m) { + if (log.isTraceEnabled()) { + log.trace(String.format("Table %s row key: %s", table, hexDump(m.getRow()))); + for (ColumnUpdate cu : m.getUpdates()) { + log.trace(String.format("Table %s column: %s:%s", table, hexDump(cu.getColumnFamily()), hexDump(cu.getColumnQualifier()))); + log.trace(String.format("Table %s security: %s", table, new ColumnVisibility(cu.getColumnVisibility()).toString())); + log.trace(String.format("Table %s value: %s", table, hexDump(cu.getValue()))); + } + } + return m.getUpdates().size(); + } + + private String hexDump(byte[] ba) { + StringBuilder sb = new StringBuilder(); + for (byte b : ba) { + if ((b > 0x20) && (b < 0x7e)) + sb.append((char) b); + else + sb.append(String.format("x%02x", b)); + } + return sb.toString(); + } + + @Override + public void close(TaskAttemptContext attempt) throws IOException, InterruptedException { + log.debug("mutations written: " + mutCount + ", values written: " + valCount); + if (simulate) + return; + + try { + mtbw.close(); + } catch (MutationsRejectedException e) { + if (e.getAuthorizationFailuresMap().size() >= 0) { + HashMap> tables = new HashMap>(); + for (Entry> ke : e.getAuthorizationFailuresMap().entrySet()) { + Set secCodes = tables.get(ke.getKey().getTableId().toString()); + if (secCodes == null) { + secCodes = new HashSet(); + tables.put(ke.getKey().getTableId().toString(), secCodes); + } + secCodes.addAll(ke.getValue()); + } + + log.error("Not authorized to write to tables : " + tables); + } + + if (e.getConstraintViolationSummaries().size() > 0) { + log.error("Constraint violations : " + e.getConstraintViolationSummaries().size()); + } + } + } + } + + @Override + public void checkOutputSpecs(JobContext job) throws IOException { + if (!isConnectorInfoSet(job)) + throw new IOException("Connector info has not been set."); + try { + // if the instance isn't configured, it will complain here + String principal = getPrincipal(job); + AuthenticationToken token = getAuthenticationToken(job); + Connector c = getInstance(job).getConnector(principal, token); + if (!c.securityOperations().authenticateUser(principal, token)) + throw new IOException("Unable to authenticate user"); + } catch (AccumuloException e) { + throw new IOException(e); + } catch (AccumuloSecurityException e) { + throw new IOException(e); + } + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) { + return new NullOutputFormat().getOutputCommitter(context); + } + + @Override + public RecordWriter getRecordWriter(TaskAttemptContext attempt) throws IOException { + try { + return new AccumuloRecordWriter(attempt); + } catch (Exception e) { + throw new IOException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/99baad37/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java b/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java new file mode 100644 index 0000000..37caf15 --- /dev/null +++ b/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client.mapreduce; + +import java.io.IOException; +import java.util.Map.Entry; + +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.RowIterator; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.PeekingIterator; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * This class allows MapReduce jobs to use Accumulo as the source of data. This {@link InputFormat} provides row names as {@link Text} as keys, and a + * corresponding {@link PeekingIterator} as a value, which in turn makes the {@link Key}/{@link Value} pairs for that row available to the Map function. + * + * The user must specify the following via static configurator methods: + * + *

    + *
  • {@link AccumuloRowInputFormat#setConnectorInfo(Job, String, AuthenticationToken)} + *
  • {@link AccumuloRowInputFormat#setInputTableName(Job, String)} + *
  • {@link AccumuloRowInputFormat#setScanAuthorizations(Job, Authorizations)} + *
  • {@link AccumuloRowInputFormat#setZooKeeperInstance(Job, ClientConfiguration)} OR {@link AccumuloRowInputFormat#setMockInstance(Job, String)} + *
+ * + * Other static methods are optional. + */ +public class AccumuloRowInputFormat extends InputFormatBase>> { + @Override + public RecordReader>> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, + InterruptedException { + log.setLevel(getLogLevel(context)); + return new RecordReaderBase>>() { + RowIterator rowIterator; + + @Override + public void initialize(InputSplit inSplit, TaskAttemptContext attempt) throws IOException { + super.initialize(inSplit, attempt); + rowIterator = new RowIterator(scannerIterator); + currentK = new Text(); + currentV = null; + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if (!rowIterator.hasNext()) + return false; + currentV = new PeekingIterator>(rowIterator.next()); + numKeysRead = rowIterator.getKVCount(); + currentKey = currentV.peek().getKey(); + currentK = new Text(currentKey.getRow()); + return true; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/99baad37/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java b/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java new file mode 100644 index 0000000..e58e350 --- /dev/null +++ b/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client.mapreduce; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Set; + +import org.apache.accumulo.core.client.ClientSideIteratorScanner; +import org.apache.accumulo.core.client.IsolatedScanner; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.impl.TabletLocator; +import org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.util.Pair; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * This abstract {@link InputFormat} class allows MapReduce jobs to use Accumulo as the source of K,V pairs. + *

+ * Subclasses must implement a {@link #createRecordReader(InputSplit, TaskAttemptContext)} to provide a {@link RecordReader} for K,V. + *

+ * A static base class, RecordReaderBase, is provided to retrieve Accumulo {@link Key}/{@link Value} pairs, but one must implement its + * {@link RecordReaderBase#nextKeyValue()} to transform them to the desired generic types K,V. + *

+ * See {@link AccumuloInputFormat} for an example implementation. + */ +public abstract class InputFormatBase extends AbstractInputFormat { + + /** + * Gets the table name from the configuration. + * + * @param context + * the Hadoop context for the configured job + * @return the table name + * @since 1.5.0 + * @see #setInputTableName(Job, String) + */ + protected static String getInputTableName(JobContext context) { + return InputConfigurator.getInputTableName(CLASS, getConfiguration(context)); + } + + /** + * Sets the name of the input table, over which this job will scan. + * + * @param job + * the Hadoop job instance to be configured + * @param tableName + * the table to use when the tablename is null in the write call + * @since 1.5.0 + */ + public static void setInputTableName(Job job, String tableName) { + InputConfigurator.setInputTableName(CLASS, job.getConfiguration(), tableName); + } + + /** + * Sets the input ranges to scan for the single input table associated with this job. + * + * @param job + * the Hadoop job instance to be configured + * @param ranges + * the ranges that will be mapped over + * @since 1.5.0 + */ + public static void setRanges(Job job, Collection ranges) { + InputConfigurator.setRanges(CLASS, job.getConfiguration(), ranges); + } + + /** + * Gets the ranges to scan over from a job. + * + * @param context + * the Hadoop context for the configured job + * @return the ranges + * @since 1.5.0 + * @see #setRanges(Job, Collection) + */ + protected static List getRanges(JobContext context) throws IOException { + return InputConfigurator.getRanges(CLASS, getConfiguration(context)); + } + + /** + * Restricts the columns that will be mapped over for this job for the default input table. + * + * @param job + * the Hadoop job instance to be configured + * @param columnFamilyColumnQualifierPairs + * a pair of {@link Text} objects corresponding to column family and column qualifier. If the column qualifier is null, the entire column family is + * selected. An empty set is the default and is equivalent to scanning the all columns. + * @since 1.5.0 + */ + public static void fetchColumns(Job job, Collection> columnFamilyColumnQualifierPairs) { + InputConfigurator.fetchColumns(CLASS, job.getConfiguration(), columnFamilyColumnQualifierPairs); + } + + /** + * Gets the columns to be mapped over from this job. + * + * @param context + * the Hadoop context for the configured job + * @return a set of columns + * @since 1.5.0 + * @see #fetchColumns(Job, Collection) + */ + protected static Set> getFetchedColumns(JobContext context) { + return InputConfigurator.getFetchedColumns(CLASS, getConfiguration(context)); + } + + /** + * Encode an iterator on the single input table for this job. + * + * @param job + * the Hadoop job instance to be configured + * @param cfg + * the configuration of the iterator + * @since 1.5.0 + */ + public static void addIterator(Job job, IteratorSetting cfg) { + InputConfigurator.addIterator(CLASS, job.getConfiguration(), cfg); + } + + /** + * Gets a list of the iterator settings (for iterators to apply to a scanner) from this configuration. + * + * @param context + * the Hadoop context for the configured job + * @return a list of iterators + * @since 1.5.0 + * @see #addIterator(Job, IteratorSetting) + */ + protected static List getIterators(JobContext context) { + return InputConfigurator.getIterators(CLASS, getConfiguration(context)); + } + + /** + * Controls the automatic adjustment of ranges for this job. This feature merges overlapping ranges, then splits them to align with tablet boundaries. + * Disabling this feature will cause exactly one Map task to be created for each specified range. The default setting is enabled. * + * + *

+ * By default, this feature is enabled. + * + * @param job + * the Hadoop job instance to be configured + * @param enableFeature + * the feature is enabled if true, disabled otherwise + * @see #setRanges(Job, Collection) + * @since 1.5.0 + */ + public static void setAutoAdjustRanges(Job job, boolean enableFeature) { + InputConfigurator.setAutoAdjustRanges(CLASS, job.getConfiguration(), enableFeature); + } + + /** + * Determines whether a configuration has auto-adjust ranges enabled. + * + * @param context + * the Hadoop context for the configured job + * @return false if the feature is disabled, true otherwise + * @since 1.5.0 + * @see #setAutoAdjustRanges(Job, boolean) + */ + protected static boolean getAutoAdjustRanges(JobContext context) { + return InputConfigurator.getAutoAdjustRanges(CLASS, getConfiguration(context)); + } + + /** + * Controls the use of the {@link IsolatedScanner} in this job. + * + *

+ * By default, this feature is disabled. + * + * @param job + * the Hadoop job instance to be configured + * @param enableFeature + * the feature is enabled if true, disabled otherwise + * @since 1.5.0 + */ + public static void setScanIsolation(Job job, boolean enableFeature) { + InputConfigurator.setScanIsolation(CLASS, job.getConfiguration(), enableFeature); + } + + /** + * Determines whether a configuration has isolation enabled. + * + * @param context + * the Hadoop context for the configured job + * @return true if the feature is enabled, false otherwise + * @since 1.5.0 + * @see #setScanIsolation(Job, boolean) + */ + protected static boolean isIsolated(JobContext context) { + return InputConfigurator.isIsolated(CLASS, getConfiguration(context)); + } + + /** + * Controls the use of the {@link ClientSideIteratorScanner} in this job. Enabling this feature will cause the iterator stack to be constructed within the Map + * task, rather than within the Accumulo TServer. To use this feature, all classes needed for those iterators must be available on the classpath for the task. + * + *

+ * By default, this feature is disabled. + * + * @param job + * the Hadoop job instance to be configured + * @param enableFeature + * the feature is enabled if true, disabled otherwise + * @since 1.5.0 + */ + public static void setLocalIterators(Job job, boolean enableFeature) { + InputConfigurator.setLocalIterators(CLASS, job.getConfiguration(), enableFeature); + } + + /** + * Determines whether a configuration uses local iterators. + * + * @param context + * the Hadoop context for the configured job + * @return true if the feature is enabled, false otherwise + * @since 1.5.0 + * @see #setLocalIterators(Job, boolean) + */ + protected static boolean usesLocalIterators(JobContext context) { + return InputConfigurator.usesLocalIterators(CLASS, getConfiguration(context)); + } + + /** + *

+ * Enable reading offline tables. By default, this feature is disabled and only online tables are scanned. This will make the map reduce job directly read the + * table's files. If the table is not offline, then the job will fail. If the table comes online during the map reduce job, it is likely that the job will + * fail. + * + *

+ * To use this option, the map reduce user will need access to read the Accumulo directory in HDFS. + * + *

+ * Reading the offline table will create the scan time iterator stack in the map process. So any iterators that are configured for the table will need to be + * on the mapper's classpath. + * + *

+ * One way to use this feature is to clone a table, take the clone offline, and use the clone as the input table for a map reduce job. If you plan to map + * reduce over the data many times, it may be better to the compact the table, clone it, take it offline, and use the clone for all map reduce jobs. The + * reason to do this is that compaction will reduce each tablet in the table to one file, and it is faster to read from one file. + * + *

+ * There are two possible advantages to reading a tables file directly out of HDFS. First, you may see better read performance. Second, it will support + * speculative execution better. When reading an online table speculative execution can put more load on an already slow tablet server. + * + *

+ * By default, this feature is disabled. + * + * @param job + * the Hadoop job instance to be configured + * @param enableFeature + * the feature is enabled if true, disabled otherwise + * @since 1.5.0 + */ + public static void setOfflineTableScan(Job job, boolean enableFeature) { + InputConfigurator.setOfflineTableScan(CLASS, job.getConfiguration(), enableFeature); + } + + /** + * Determines whether a configuration has the offline table scan feature enabled. + * + * @param context + * the Hadoop context for the configured job + * @return true if the feature is enabled, false otherwise + * @since 1.5.0 + * @see #setOfflineTableScan(Job, boolean) + */ + protected static boolean isOfflineScan(JobContext context) { + return InputConfigurator.isOfflineScan(CLASS, getConfiguration(context)); + } + + /** + * Initializes an Accumulo {@link org.apache.accumulo.core.client.impl.TabletLocator} based on the configuration. + * + * @param context + * the Hadoop context for the configured job + * @return an Accumulo tablet locator + * @throws org.apache.accumulo.core.client.TableNotFoundException + * if the table name set on the configuration doesn't exist + * @since 1.5.0 + * @deprecated since 1.6.0 + */ + @Deprecated + protected static TabletLocator getTabletLocator(JobContext context) throws TableNotFoundException { + return InputConfigurator.getTabletLocator(CLASS, getConfiguration(context), InputConfigurator.getInputTableName(CLASS, getConfiguration(context))); + } + + protected abstract static class RecordReaderBase extends AbstractRecordReader { + + /** + * Apply the configured iterators from the configuration to the scanner for the specified table name + * + * @param context + * the Hadoop context for the configured job + * @param scanner + * the scanner to configure + * @since 1.6.0 + */ + @Override + protected void setupIterators(TaskAttemptContext context, Scanner scanner, String tableName, org.apache.accumulo.core.client.mapreduce.RangeInputSplit split) { + setupIterators(context, scanner, split); + } + + /** + * Apply the configured iterators from the configuration to the scanner. + * + * @param context + * the Hadoop context for the configured job + * @param scanner + * the scanner to configure + */ + @Deprecated + protected void setupIterators(TaskAttemptContext context, Scanner scanner) { + setupIterators(context, scanner, null); + } + + /** + * Initialize a scanner over the given input split using this task attempt configuration. + */ + protected void setupIterators(TaskAttemptContext context, Scanner scanner, org.apache.accumulo.core.client.mapreduce.RangeInputSplit split) { + List iterators = null; + if (null == split) { + iterators = getIterators(context); + } else { + iterators = split.getIterators(); + if (null == iterators) { + iterators = getIterators(context); + } + } + for (IteratorSetting iterator : iterators) + scanner.addScanIterator(iterator); + } + } + + /** + * @deprecated since 1.5.2; Use {@link org.apache.accumulo.core.client.mapreduce.RangeInputSplit} instead. + * @see org.apache.accumulo.core.client.mapreduce.RangeInputSplit + */ + @Deprecated + public static class RangeInputSplit extends org.apache.accumulo.core.client.mapreduce.RangeInputSplit { + + public RangeInputSplit() { + super(); + } + + public RangeInputSplit(RangeInputSplit other) throws IOException { + super(other); + } + + protected RangeInputSplit(String table, Range range, String[] locations) { + super(table, "", range, locations); + } + + public RangeInputSplit(String table, String tableId, Range range, String[] locations) { + super(table, tableId, range, locations); + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/99baad37/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/InputTableConfig.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/InputTableConfig.java b/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/InputTableConfig.java new file mode 100644 index 0000000..e59451e --- /dev/null +++ b/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/InputTableConfig.java @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client.mapreduce; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; + +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.util.Pair; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; + +/** + * This class to holds a batch scan configuration for a table. It contains all the properties needed to specify how rows should be returned from the table. + */ +public class InputTableConfig implements Writable { + + private List iterators; + private List ranges; + private Collection> columns; + + private boolean autoAdjustRanges = true; + private boolean useLocalIterators = false; + private boolean useIsolatedScanners = false; + private boolean offlineScan = false; + + public InputTableConfig() {} + + /** + * Creates a batch scan config object out of a previously serialized batch scan config object. + * + * @param input + * the data input of the serialized batch scan config + */ + public InputTableConfig(DataInput input) throws IOException { + readFields(input); + } + + /** + * Sets the input ranges to scan for all tables associated with this job. This will be added to any per-table ranges that have been set using + * + * @param ranges + * the ranges that will be mapped over + * @since 1.6.0 + */ + public InputTableConfig setRanges(List ranges) { + this.ranges = ranges; + return this; + } + + /** + * Returns the ranges to be queried in the configuration + */ + public List getRanges() { + return ranges != null ? ranges : new ArrayList(); + } + + /** + * Restricts the columns that will be mapped over for this job for the default input table. + * + * @param columns + * a pair of {@link Text} objects corresponding to column family and column qualifier. If the column qualifier is null, the entire column family is + * selected. An empty set is the default and is equivalent to scanning the all columns. + * @since 1.6.0 + */ + public InputTableConfig fetchColumns(Collection> columns) { + this.columns = columns; + return this; + } + + /** + * Returns the columns to be fetched for this configuration + */ + public Collection> getFetchedColumns() { + return columns != null ? columns : new HashSet>(); + } + + /** + * Set iterators on to be used in the query. + * + * @param iterators + * the configurations for the iterators + * @since 1.6.0 + */ + public InputTableConfig setIterators(List iterators) { + this.iterators = iterators; + return this; + } + + /** + * Returns the iterators to be set on this configuration + */ + public List getIterators() { + return iterators != null ? iterators : new ArrayList(); + } + + /** + * Controls the automatic adjustment of ranges for this job. This feature merges overlapping ranges, then splits them to align with tablet boundaries. + * Disabling this feature will cause exactly one Map task to be created for each specified range. The default setting is enabled. * + * + *

+ * By default, this feature is enabled. + * + * @param autoAdjustRanges + * the feature is enabled if true, disabled otherwise + * @see #setRanges(java.util.List) + * @since 1.6.0 + */ + public InputTableConfig setAutoAdjustRanges(boolean autoAdjustRanges) { + this.autoAdjustRanges = autoAdjustRanges; + return this; + } + + /** + * Determines whether a configuration has auto-adjust ranges enabled. + * + * @return false if the feature is disabled, true otherwise + * @since 1.6.0 + * @see #setAutoAdjustRanges(boolean) + */ + public boolean shouldAutoAdjustRanges() { + return autoAdjustRanges; + } + + /** + * Controls the use of the {@link org.apache.accumulo.core.client.ClientSideIteratorScanner} in this job. Enabling this feature will cause the iterator stack + * to be constructed within the Map task, rather than within the Accumulo TServer. To use this feature, all classes needed for those iterators must be + * available on the classpath for the task. + * + *

+ * By default, this feature is disabled. + * + * @param useLocalIterators + * the feature is enabled if true, disabled otherwise + * @since 1.6.0 + */ + public InputTableConfig setUseLocalIterators(boolean useLocalIterators) { + this.useLocalIterators = useLocalIterators; + return this; + } + + /** + * Determines whether a configuration uses local iterators. + * + * @return true if the feature is enabled, false otherwise + * @since 1.6.0 + * @see #setUseLocalIterators(boolean) + */ + public boolean shouldUseLocalIterators() { + return useLocalIterators; + } + + /** + *

+ * Enable reading offline tables. By default, this feature is disabled and only online tables are scanned. This will make the map reduce job directly read the + * table's files. If the table is not offline, then the job will fail. If the table comes online during the map reduce job, it is likely that the job will + * fail. + * + *

+ * To use this option, the map reduce user will need access to read the Accumulo directory in HDFS. + * + *

+ * Reading the offline table will create the scan time iterator stack in the map process. So any iterators that are configured for the table will need to be + * on the mapper's classpath. The accumulo-site.xml may need to be on the mapper's classpath if HDFS or the Accumulo directory in HDFS are non-standard. + * + *

+ * One way to use this feature is to clone a table, take the clone offline, and use the clone as the input table for a map reduce job. If you plan to map + * reduce over the data many times, it may be better to the compact the table, clone it, take it offline, and use the clone for all map reduce jobs. The + * reason to do this is that compaction will reduce each tablet in the table to one file, and it is faster to read from one file. + * + *

+ * There are two possible advantages to reading a tables file directly out of HDFS. First, you may see better read performance. Second, it will support + * speculative execution better. When reading an online table speculative execution can put more load on an already slow tablet server. + * + *

+ * By default, this feature is disabled. + * + * @param offlineScan + * the feature is enabled if true, disabled otherwise + * @since 1.6.0 + */ + public InputTableConfig setOfflineScan(boolean offlineScan) { + this.offlineScan = offlineScan; + return this; + } + + /** + * Determines whether a configuration has the offline table scan feature enabled. + * + * @return true if the feature is enabled, false otherwise + * @since 1.6.0 + * @see #setOfflineScan(boolean) + */ + public boolean isOfflineScan() { + return offlineScan; + } + + /** + * Controls the use of the {@link org.apache.accumulo.core.client.IsolatedScanner} in this job. + * + *

+ * By default, this feature is disabled. + * + * @param useIsolatedScanners + * the feature is enabled if true, disabled otherwise + * @since 1.6.0 + */ + public InputTableConfig setUseIsolatedScanners(boolean useIsolatedScanners) { + this.useIsolatedScanners = useIsolatedScanners; + return this; + } + + /** + * Determines whether a configuration has isolation enabled. + * + * @return true if the feature is enabled, false otherwise + * @since 1.6.0 + * @see #setUseIsolatedScanners(boolean) + */ + public boolean shouldUseIsolatedScanners() { + return useIsolatedScanners; + } + + /** + * Writes the state for the current object out to the specified {@link DataOutput} + * + * @param dataOutput + * the output for which to write the object's state + */ + @Override + public void write(DataOutput dataOutput) throws IOException { + if (iterators != null) { + dataOutput.writeInt(iterators.size()); + for (IteratorSetting setting : iterators) + setting.write(dataOutput); + } else { + dataOutput.writeInt(0); + } + if (ranges != null) { + dataOutput.writeInt(ranges.size()); + for (Range range : ranges) + range.write(dataOutput); + } else { + dataOutput.writeInt(0); + } + if (columns != null) { + dataOutput.writeInt(columns.size()); + for (Pair column : columns) { + if (column.getSecond() == null) { + dataOutput.writeInt(1); + column.getFirst().write(dataOutput); + } else { + dataOutput.writeInt(2); + column.getFirst().write(dataOutput); + column.getSecond().write(dataOutput); + } + } + } else { + dataOutput.writeInt(0); + } + dataOutput.writeBoolean(autoAdjustRanges); + dataOutput.writeBoolean(useLocalIterators); + dataOutput.writeBoolean(useIsolatedScanners); + } + + /** + * Reads the fields in the {@link DataInput} into the current object + * + * @param dataInput + * the input fields to read into the current object + */ + @Override + public void readFields(DataInput dataInput) throws IOException { + // load iterators + long iterSize = dataInput.readInt(); + if (iterSize > 0) + iterators = new ArrayList(); + for (int i = 0; i < iterSize; i++) + iterators.add(new IteratorSetting(dataInput)); + // load ranges + long rangeSize = dataInput.readInt(); + if (rangeSize > 0) + ranges = new ArrayList(); + for (int i = 0; i < rangeSize; i++) { + Range range = new Range(); + range.readFields(dataInput); + ranges.add(range); + } + // load columns + long columnSize = dataInput.readInt(); + if (columnSize > 0) + columns = new HashSet>(); + for (int i = 0; i < columnSize; i++) { + long numPairs = dataInput.readInt(); + Text colFam = new Text(); + colFam.readFields(dataInput); + if (numPairs == 1) { + columns.add(new Pair(colFam, null)); + } else if (numPairs == 2) { + Text colQual = new Text(); + colQual.readFields(dataInput); + columns.add(new Pair(colFam, colQual)); + } + } + autoAdjustRanges = dataInput.readBoolean(); + useLocalIterators = dataInput.readBoolean(); + useIsolatedScanners = dataInput.readBoolean(); + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + InputTableConfig that = (InputTableConfig) o; + + if (autoAdjustRanges != that.autoAdjustRanges) + return false; + if (offlineScan != that.offlineScan) + return false; + if (useIsolatedScanners != that.useIsolatedScanners) + return false; + if (useLocalIterators != that.useLocalIterators) + return false; + if (columns != null ? !columns.equals(that.columns) : that.columns != null) + return false; + if (iterators != null ? !iterators.equals(that.iterators) : that.iterators != null) + return false; + if (ranges != null ? !ranges.equals(that.ranges) : that.ranges != null) + return false; + return true; + } + + @Override + public int hashCode() { + int result = 31 * (iterators != null ? iterators.hashCode() : 0); + result = 31 * result + (ranges != null ? ranges.hashCode() : 0); + result = 31 * result + (columns != null ? columns.hashCode() : 0); + result = 31 * result + (autoAdjustRanges ? 1 : 0); + result = 31 * result + (useLocalIterators ? 1 : 0); + result = 31 * result + (useIsolatedScanners ? 1 : 0); + result = 31 * result + (offlineScan ? 1 : 0); + return result; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/99baad37/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java b/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java new file mode 100644 index 0000000..4b5a149 --- /dev/null +++ b/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java @@ -0,0 +1,490 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client.mapreduce; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.math.BigInteger; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator; +import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase.TokenSource; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.AuthenticationTokenSerializer; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.Pair; +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.log4j.Level; + +/** + * The Class RangeInputSplit. Encapsulates an Accumulo range for use in Map Reduce jobs. + */ +public class RangeInputSplit extends InputSplit implements Writable { + private Range range; + private String[] locations; + private String tableId, tableName, instanceName, zooKeepers, principal; + private TokenSource tokenSource; + private String tokenFile; + private AuthenticationToken token; + private Boolean offline, mockInstance, isolatedScan, localIterators; + private Authorizations auths; + private Set> fetchedColumns; + private List iterators; + private Level level; + + public RangeInputSplit() { + range = new Range(); + locations = new String[0]; + tableName = ""; + tableId = ""; + } + + public RangeInputSplit(RangeInputSplit split) throws IOException { + this.setRange(split.getRange()); + this.setLocations(split.getLocations()); + this.setTableName(split.getTableName()); + this.setTableId(split.getTableId()); + } + + protected RangeInputSplit(String table, String tableId, Range range, String[] locations) { + this.range = range; + setLocations(locations); + this.tableName = table; + this.tableId = tableId; + } + + public Range getRange() { + return range; + } + + private static byte[] extractBytes(ByteSequence seq, int numBytes) { + byte[] bytes = new byte[numBytes + 1]; + bytes[0] = 0; + for (int i = 0; i < numBytes; i++) { + if (i >= seq.length()) + bytes[i + 1] = 0; + else + bytes[i + 1] = seq.byteAt(i); + } + return bytes; + } + + public static float getProgress(ByteSequence start, ByteSequence end, ByteSequence position) { + int maxDepth = Math.min(Math.max(end.length(), start.length()), position.length()); + BigInteger startBI = new BigInteger(extractBytes(start, maxDepth)); + BigInteger endBI = new BigInteger(extractBytes(end, maxDepth)); + BigInteger positionBI = new BigInteger(extractBytes(position, maxDepth)); + return (float) (positionBI.subtract(startBI).doubleValue() / endBI.subtract(startBI).doubleValue()); + } + + public float getProgress(Key currentKey) { + if (currentKey == null) + return 0f; + if (range.getStartKey() != null && range.getEndKey() != null) { + if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW) != 0) { + // just look at the row progress + return getProgress(range.getStartKey().getRowData(), range.getEndKey().getRowData(), currentKey.getRowData()); + } else if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW_COLFAM) != 0) { + // just look at the column family progress + return getProgress(range.getStartKey().getColumnFamilyData(), range.getEndKey().getColumnFamilyData(), currentKey.getColumnFamilyData()); + } else if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW_COLFAM_COLQUAL) != 0) { + // just look at the column qualifier progress + return getProgress(range.getStartKey().getColumnQualifierData(), range.getEndKey().getColumnQualifierData(), currentKey.getColumnQualifierData()); + } + } + // if we can't figure it out, then claim no progress + return 0f; + } + + /** + * This implementation of length is only an estimate, it does not provide exact values. Do not have your code rely on this return value. + */ + @Override + public long getLength() throws IOException { + Text startRow = range.isInfiniteStartKey() ? new Text(new byte[] {Byte.MIN_VALUE}) : range.getStartKey().getRow(); + Text stopRow = range.isInfiniteStopKey() ? new Text(new byte[] {Byte.MAX_VALUE}) : range.getEndKey().getRow(); + int maxCommon = Math.min(7, Math.min(startRow.getLength(), stopRow.getLength())); + long diff = 0; + + byte[] start = startRow.getBytes(); + byte[] stop = stopRow.getBytes(); + for (int i = 0; i < maxCommon; ++i) { + diff |= 0xff & (start[i] ^ stop[i]); + diff <<= Byte.SIZE; + } + + if (startRow.getLength() != stopRow.getLength()) + diff |= 0xff; + + return diff + 1; + } + + @Override + public String[] getLocations() throws IOException { + return Arrays.copyOf(locations, locations.length); + } + + @Override + public void readFields(DataInput in) throws IOException { + range.readFields(in); + tableName = in.readUTF(); + tableId = in.readUTF(); + int numLocs = in.readInt(); + locations = new String[numLocs]; + for (int i = 0; i < numLocs; ++i) + locations[i] = in.readUTF(); + + if (in.readBoolean()) { + isolatedScan = in.readBoolean(); + } + + if (in.readBoolean()) { + offline = in.readBoolean(); + } + + if (in.readBoolean()) { + localIterators = in.readBoolean(); + } + + if (in.readBoolean()) { + mockInstance = in.readBoolean(); + } + + if (in.readBoolean()) { + int numColumns = in.readInt(); + List columns = new ArrayList(numColumns); + for (int i = 0; i < numColumns; i++) { + columns.add(in.readUTF()); + } + + fetchedColumns = InputConfigurator.deserializeFetchedColumns(columns); + } + + if (in.readBoolean()) { + String strAuths = in.readUTF(); + auths = new Authorizations(strAuths.getBytes(StandardCharsets.UTF_8)); + } + + if (in.readBoolean()) { + principal = in.readUTF(); + } + + if (in.readBoolean()) { + int ordinal = in.readInt(); + this.tokenSource = TokenSource.values()[ordinal]; + + switch (this.tokenSource) { + case INLINE: + String tokenClass = in.readUTF(); + byte[] base64TokenBytes = in.readUTF().getBytes(StandardCharsets.UTF_8); + byte[] tokenBytes = Base64.decodeBase64(base64TokenBytes); + + this.token = AuthenticationTokenSerializer.deserialize(tokenClass, tokenBytes); + break; + + case FILE: + this.tokenFile = in.readUTF(); + + break; + default: + throw new IOException("Cannot parse unknown TokenSource ordinal"); + } + } + + if (in.readBoolean()) { + instanceName = in.readUTF(); + } + + if (in.readBoolean()) { + zooKeepers = in.readUTF(); + } + + if (in.readBoolean()) { + level = Level.toLevel(in.readInt()); + } + } + + @Override + public void write(DataOutput out) throws IOException { + range.write(out); + out.writeUTF(tableName); + out.writeUTF(tableId); + out.writeInt(locations.length); + for (int i = 0; i < locations.length; ++i) + out.writeUTF(locations[i]); + + out.writeBoolean(null != isolatedScan); + if (null != isolatedScan) { + out.writeBoolean(isolatedScan); + } + + out.writeBoolean(null != offline); + if (null != offline) { + out.writeBoolean(offline); + } + + out.writeBoolean(null != localIterators); + if (null != localIterators) { + out.writeBoolean(localIterators); + } + + out.writeBoolean(null != mockInstance); + if (null != mockInstance) { + out.writeBoolean(mockInstance); + } + + out.writeBoolean(null != fetchedColumns); + if (null != fetchedColumns) { + String[] cols = InputConfigurator.serializeColumns(fetchedColumns); + out.writeInt(cols.length); + for (String col : cols) { + out.writeUTF(col); + } + } + + out.writeBoolean(null != auths); + if (null != auths) { + out.writeUTF(auths.serialize()); + } + + out.writeBoolean(null != principal); + if (null != principal) { + out.writeUTF(principal); + } + + out.writeBoolean(null != tokenSource); + if (null != tokenSource) { + out.writeInt(tokenSource.ordinal()); + + if (null != token && null != tokenFile) { + throw new IOException("Cannot use both inline AuthenticationToken and file-based AuthenticationToken"); + } else if (null != token) { + out.writeUTF(token.getClass().getCanonicalName()); + out.writeUTF(Base64.encodeBase64String(AuthenticationTokenSerializer.serialize(token))); + } else { + out.writeUTF(tokenFile); + } + } + + out.writeBoolean(null != instanceName); + if (null != instanceName) { + out.writeUTF(instanceName); + } + + out.writeBoolean(null != zooKeepers); + if (null != zooKeepers) { + out.writeUTF(zooKeepers); + } + + out.writeBoolean(null != level); + if (null != level) { + out.writeInt(level.toInt()); + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(256); + sb.append("Range: ").append(range); + sb.append(" Locations: ").append(Arrays.asList(locations)); + sb.append(" Table: ").append(tableName); + sb.append(" TableID: ").append(tableId); + sb.append(" InstanceName: ").append(instanceName); + sb.append(" zooKeepers: ").append(zooKeepers); + sb.append(" principal: ").append(principal); + sb.append(" tokenSource: ").append(tokenSource); + sb.append(" authenticationToken: ").append(token); + sb.append(" authenticationTokenFile: ").append(tokenFile); + sb.append(" Authorizations: ").append(auths); + sb.append(" offlineScan: ").append(offline); + sb.append(" mockInstance: ").append(mockInstance); + sb.append(" isolatedScan: ").append(isolatedScan); + sb.append(" localIterators: ").append(localIterators); + sb.append(" fetchColumns: ").append(fetchedColumns); + sb.append(" iterators: ").append(iterators); + sb.append(" logLevel: ").append(level); + return sb.toString(); + } + + public String getTableName() { + return tableName; + } + + public void setTableName(String table) { + this.tableName = table; + } + + public void setTableId(String tableId) { + this.tableId = tableId; + } + + public String getTableId() { + return tableId; + } + + public Instance getInstance() { + if (null == instanceName) { + return null; + } + + if (isMockInstance()) { + return new MockInstance(getInstanceName()); + } + + if (null == zooKeepers) { + return null; + } + + return new ZooKeeperInstance(ClientConfiguration.loadDefault().withInstance(getInstanceName()).withZkHosts(getZooKeepers())); + } + + public String getInstanceName() { + return instanceName; + } + + public void setInstanceName(String instanceName) { + this.instanceName = instanceName; + } + + public String getZooKeepers() { + return zooKeepers; + } + + public void setZooKeepers(String zooKeepers) { + this.zooKeepers = zooKeepers; + } + + public String getPrincipal() { + return principal; + } + + public void setPrincipal(String principal) { + this.principal = principal; + } + + public AuthenticationToken getToken() { + return token; + } + + public void setToken(AuthenticationToken token) { + this.tokenSource = TokenSource.INLINE; + this.token = token; + } + + public void setToken(String tokenFile) { + this.tokenSource = TokenSource.FILE; + this.tokenFile = tokenFile; + } + + public Boolean isOffline() { + return offline; + } + + public void setOffline(Boolean offline) { + this.offline = offline; + } + + public void setLocations(String[] locations) { + this.locations = Arrays.copyOf(locations, locations.length); + } + + public Boolean isMockInstance() { + return mockInstance; + } + + public void setMockInstance(Boolean mockInstance) { + this.mockInstance = mockInstance; + } + + public Boolean isIsolatedScan() { + return isolatedScan; + } + + public void setIsolatedScan(Boolean isolatedScan) { + this.isolatedScan = isolatedScan; + } + + public Authorizations getAuths() { + return auths; + } + + public void setAuths(Authorizations auths) { + this.auths = auths; + } + + public void setRange(Range range) { + this.range = range; + } + + public Boolean usesLocalIterators() { + return localIterators; + } + + public void setUsesLocalIterators(Boolean localIterators) { + this.localIterators = localIterators; + } + + public Set> getFetchedColumns() { + return fetchedColumns; + } + + public void setFetchedColumns(Collection> fetchedColumns) { + this.fetchedColumns = new HashSet>(); + for (Pair columns : fetchedColumns) { + this.fetchedColumns.add(columns); + } + } + + public void setFetchedColumns(Set> fetchedColumns) { + this.fetchedColumns = fetchedColumns; + } + + public List getIterators() { + return iterators; + } + + public void setIterators(List iterators) { + this.iterators = iterators; + } + + public Level getLogLevel() { + return level; + } + + public void setLogLevel(Level level) { + this.level = level; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/99baad37/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/ConfiguratorBase.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/ConfiguratorBase.java b/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/ConfiguratorBase.java new file mode 100644 index 0000000..4610556 --- /dev/null +++ b/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/ConfiguratorBase.java @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client.mapreduce.lib.impl; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; + +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.AuthenticationTokenSerializer; +import org.apache.accumulo.core.security.Credentials; +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.StringUtils; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +/** + * @since 1.6.0 + */ +public class ConfiguratorBase { + + /** + * Configuration keys for {@link Instance#getConnector(String, AuthenticationToken)}. + * + * @since 1.6.0 + */ + public static enum ConnectorInfo { + IS_CONFIGURED, PRINCIPAL, TOKEN, + } + + public static enum TokenSource { + FILE, INLINE; + + private String prefix; + + private TokenSource() { + prefix = name().toLowerCase() + ":"; + } + + public String prefix() { + return prefix; + } + } + + /** + * Configuration keys for {@link Instance}, {@link ZooKeeperInstance}, and {@link MockInstance}. + * + * @since 1.6.0 + */ + public static enum InstanceOpts { + TYPE, NAME, ZOO_KEEPERS, CLIENT_CONFIG; + } + + /** + * Configuration keys for general configuration options. + * + * @since 1.6.0 + */ + public static enum GeneralOpts { + LOG_LEVEL + } + + /** + * Provides a configuration key for a given feature enum, prefixed by the implementingClass + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param e + * the enum used to provide the unique part of the configuration key + * @return the configuration key + * @since 1.6.0 + */ + protected static String enumToConfKey(Class implementingClass, Enum e) { + return implementingClass.getSimpleName() + "." + e.getDeclaringClass().getSimpleName() + "." + StringUtils.camelize(e.name().toLowerCase()); + } + + /** + * Sets the connector information needed to communicate with Accumulo in this job. + * + *

+ * WARNING: The serialized token is stored in the configuration and shared with all MapReduce tasks. It is BASE64 encoded to provide a charset safe + * conversion to a string, and is not intended to be secure. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @param principal + * a valid Accumulo user name + * @param token + * the user's password + * @since 1.6.0 + */ + public static void setConnectorInfo(Class implementingClass, Configuration conf, String principal, AuthenticationToken token) + throws AccumuloSecurityException { + if (isConnectorInfoSet(implementingClass, conf)) + throw new IllegalStateException("Connector info for " + implementingClass.getSimpleName() + " can only be set once per job"); + + checkArgument(principal != null, "principal is null"); + checkArgument(token != null, "token is null"); + conf.setBoolean(enumToConfKey(implementingClass, ConnectorInfo.IS_CONFIGURED), true); + conf.set(enumToConfKey(implementingClass, ConnectorInfo.PRINCIPAL), principal); + conf.set(enumToConfKey(implementingClass, ConnectorInfo.TOKEN), + TokenSource.INLINE.prefix() + token.getClass().getName() + ":" + Base64.encodeBase64String(AuthenticationTokenSerializer.serialize(token))); + } + + /** + * Sets the connector information needed to communicate with Accumulo in this job. + * + *

+ * Pulls a token file into the Distributed Cache that contains the authentication token in an attempt to be more secure than storing the password in the + * Configuration. Token file created with "bin/accumulo create-token". + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @param principal + * a valid Accumulo user name + * @param tokenFile + * the path to the token file in DFS + * @since 1.6.0 + */ + public static void setConnectorInfo(Class implementingClass, Configuration conf, String principal, String tokenFile) throws AccumuloSecurityException { + if (isConnectorInfoSet(implementingClass, conf)) + throw new IllegalStateException("Connector info for " + implementingClass.getSimpleName() + " can only be set once per job"); + + checkArgument(principal != null, "principal is null"); + checkArgument(tokenFile != null, "tokenFile is null"); + + try { + DistributedCacheHelper.addCacheFile(new URI(tokenFile), conf); + } catch (URISyntaxException e) { + throw new IllegalStateException("Unable to add tokenFile \"" + tokenFile + "\" to distributed cache."); + } + + conf.setBoolean(enumToConfKey(implementingClass, ConnectorInfo.IS_CONFIGURED), true); + conf.set(enumToConfKey(implementingClass, ConnectorInfo.PRINCIPAL), principal); + conf.set(enumToConfKey(implementingClass, ConnectorInfo.TOKEN), TokenSource.FILE.prefix() + tokenFile); + } + + /** + * Determines if the connector info has already been set for this instance. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @return true if the connector info has already been set, false otherwise + * @since 1.6.0 + * @see #setConnectorInfo(Class, Configuration, String, AuthenticationToken) + */ + public static Boolean isConnectorInfoSet(Class implementingClass, Configuration conf) { + return conf.getBoolean(enumToConfKey(implementingClass, ConnectorInfo.IS_CONFIGURED), false); + } + + /** + * Gets the user name from the configuration. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @return the principal + * @since 1.6.0 + * @see #setConnectorInfo(Class, Configuration, String, AuthenticationToken) + */ + public static String getPrincipal(Class implementingClass, Configuration conf) { + return conf.get(enumToConfKey(implementingClass, ConnectorInfo.PRINCIPAL)); + } + + /** + * Gets the authenticated token from either the specified token file or directly from the configuration, whichever was used when the job was configured. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @return the principal's authentication token + * @since 1.6.0 + * @see #setConnectorInfo(Class, Configuration, String, AuthenticationToken) + * @see #setConnectorInfo(Class, Configuration, String, String) + */ + public static AuthenticationToken getAuthenticationToken(Class implementingClass, Configuration conf) { + String token = conf.get(enumToConfKey(implementingClass, ConnectorInfo.TOKEN)); + if (token == null || token.isEmpty()) + return null; + if (token.startsWith(TokenSource.INLINE.prefix())) { + String[] args = token.substring(TokenSource.INLINE.prefix().length()).split(":", 2); + if (args.length == 2) + return AuthenticationTokenSerializer.deserialize(args[0], Base64.decodeBase64(args[1].getBytes(StandardCharsets.UTF_8))); + } else if (token.startsWith(TokenSource.FILE.prefix())) { + String tokenFileName = token.substring(TokenSource.FILE.prefix().length()); + return getTokenFromFile(conf, getPrincipal(implementingClass, conf), tokenFileName); + } + + throw new IllegalStateException("Token was not properly serialized into the configuration"); + } + + /** + * Reads from the token file in distributed cache. Currently, the token file stores data separated by colons e.g. principal:token_class:token + * + * @param conf + * the Hadoop context for the configured job + * @return path to the token file as a String + * @since 1.6.0 + * @see #setConnectorInfo(Class, Configuration, String, AuthenticationToken) + */ + public static AuthenticationToken getTokenFromFile(Configuration conf, String principal, String tokenFile) { + FSDataInputStream in = null; + try { + URI[] uris = DistributedCacheHelper.getCacheFiles(conf); + Path path = null; + for (URI u : uris) { + if (u.toString().equals(tokenFile)) { + path = new Path(u); + } + } + if (path == null) { + throw new IllegalArgumentException("Couldn't find password file called \"" + tokenFile + "\" in cache."); + } + FileSystem fs = FileSystem.get(conf); + in = fs.open(path); + } catch (IOException e) { + throw new IllegalArgumentException("Couldn't open password file called \"" + tokenFile + "\"."); + } + try (java.util.Scanner fileScanner = new java.util.Scanner(in)) { + while (fileScanner.hasNextLine()) { + Credentials creds = Credentials.deserialize(fileScanner.nextLine()); + if (principal.equals(creds.getPrincipal())) { + return creds.getToken(); + } + } + throw new IllegalArgumentException("Couldn't find token for user \"" + principal + "\" in file \"" + tokenFile + "\""); + } + } + + /** + * Configures a {@link ZooKeeperInstance} for this job. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @param clientConfig + * client configuration for specifying connection timeouts, SSL connection options, etc. + * @since 1.6.0 + */ + public static void setZooKeeperInstance(Class implementingClass, Configuration conf, ClientConfiguration clientConfig) { + String key = enumToConfKey(implementingClass, InstanceOpts.TYPE); + if (!conf.get(key, "").isEmpty()) + throw new IllegalStateException("Instance info can only be set once per job; it has already been configured with " + conf.get(key)); + conf.set(key, "ZooKeeperInstance"); + if (clientConfig != null) { + conf.set(enumToConfKey(implementingClass, InstanceOpts.CLIENT_CONFIG), clientConfig.serialize()); + } + } + + /** + * Configures a {@link MockInstance} for this job. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @param instanceName + * the Accumulo instance name + * @since 1.6.0 + */ + public static void setMockInstance(Class implementingClass, Configuration conf, String instanceName) { + String key = enumToConfKey(implementingClass, InstanceOpts.TYPE); + if (!conf.get(key, "").isEmpty()) + throw new IllegalStateException("Instance info can only be set once per job; it has already been configured with " + conf.get(key)); + conf.set(key, "MockInstance"); + + checkArgument(instanceName != null, "instanceName is null"); + conf.set(enumToConfKey(implementingClass, InstanceOpts.NAME), instanceName); + } + + /** + * Initializes an Accumulo {@link Instance} based on the configuration. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @return an Accumulo instance + * @since 1.6.0 + * @see #setZooKeeperInstance(Class, Configuration, ClientConfiguration) + * @see #setMockInstance(Class, Configuration, String) + */ + public static Instance getInstance(Class implementingClass, Configuration conf) { + String instanceType = conf.get(enumToConfKey(implementingClass, InstanceOpts.TYPE), ""); + if ("MockInstance".equals(instanceType)) + return new MockInstance(conf.get(enumToConfKey(implementingClass, InstanceOpts.NAME))); + else if ("ZooKeeperInstance".equals(instanceType)) { + String clientConfigString = conf.get(enumToConfKey(implementingClass, InstanceOpts.CLIENT_CONFIG)); + if (clientConfigString == null) { + String instanceName = conf.get(enumToConfKey(implementingClass, InstanceOpts.NAME)); + String zookeepers = conf.get(enumToConfKey(implementingClass, InstanceOpts.ZOO_KEEPERS)); + return new ZooKeeperInstance(ClientConfiguration.loadDefault().withInstance(instanceName).withZkHosts(zookeepers)); + } else { + return new ZooKeeperInstance(ClientConfiguration.deserialize(clientConfigString)); + } + } else if (instanceType.isEmpty()) + throw new IllegalStateException("Instance has not been configured for " + implementingClass.getSimpleName()); + else + throw new IllegalStateException("Unrecognized instance type " + instanceType); + } + + /** + * Sets the log level for this job. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @param level + * the logging level + * @since 1.6.0 + */ + public static void setLogLevel(Class implementingClass, Configuration conf, Level level) { + checkArgument(level != null, "level is null"); + Logger.getLogger(implementingClass).setLevel(level); + conf.setInt(enumToConfKey(implementingClass, GeneralOpts.LOG_LEVEL), level.toInt()); + } + + /** + * Gets the log level from this configuration. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @return the log level + * @since 1.6.0 + * @see #setLogLevel(Class, Configuration, Level) + */ + public static Level getLogLevel(Class implementingClass, Configuration conf) { + return Level.toLevel(conf.getInt(enumToConfKey(implementingClass, GeneralOpts.LOG_LEVEL), Level.INFO.toInt())); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/99baad37/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/DistributedCacheHelper.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/DistributedCacheHelper.java b/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/DistributedCacheHelper.java new file mode 100644 index 0000000..c694b9a --- /dev/null +++ b/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/DistributedCacheHelper.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client.mapreduce.lib.impl; + +import java.io.IOException; +import java.net.URI; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.Path; + +/** + * @since 1.6.0 + */ +@SuppressWarnings("deprecation") +public class DistributedCacheHelper { + + /** + * @since 1.6.0 + */ + public static void addCacheFile(URI uri, Configuration conf) { + DistributedCache.addCacheFile(uri, conf); + } + + /** + * @since 1.6.0 + */ + public static URI[] getCacheFiles(Configuration conf) throws IOException { + return DistributedCache.getCacheFiles(conf); + } + + /** + * @since 1.6.0 + */ + public static Path[] getLocalCacheFiles(Configuration conf) throws IOException { + return DistributedCache.getLocalCacheFiles(conf); + } +}