accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vi...@apache.org
Subject svn commit: r1437726 [2/10] - in /accumulo/branches/ACCUMULO-259: ./ assemble/ conf/examples/1GB/native-standalone/ conf/examples/1GB/standalone/ conf/examples/2GB/native-standalone/ conf/examples/2GB/standalone/ conf/examples/3GB/native-standalone/ co...
Date Wed, 23 Jan 2013 20:52:04 GMT
Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java?rev=1437726&r1=1437725&r2=1437726&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java
(original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java
Wed Jan 23 20:51:59 2013
@@ -21,29 +21,33 @@ import java.util.Map.Entry;
 
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.format.DefaultFormatter;
+import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 /**
- * This class allows MapReduce jobs to use Accumulo as the source of data. This input format
provides keys and values of type Key and Value to the Map() and
- * Reduce() functions.
+ * This class allows MapReduce jobs to use Accumulo as the source of data. This {@link InputFormat}
provides keys and values of type {@link Key} and
+ * {@link Value} to the Map function.
  * 
- * The user must specify the following via static methods:
+ * The user must specify the following via static configurator methods:
  * 
  * <ul>
- * <li>AccumuloInputFormat.setInputTableInfo(job, username, password, table, auths)
- * <li>AccumuloInputFormat.setZooKeeperInstance(job, instanceName, hosts)
+ * <li>{@link AccumuloInputFormat#setConnectorInfo(Job, String, byte[])}
+ * <li>{@link AccumuloInputFormat#setInputTableName(Job, String)}
+ * <li>{@link AccumuloInputFormat#setScanAuthorizations(Job, Authorizations)}
+ * <li>{@link AccumuloInputFormat#setZooKeeperInstance(Job, String, String)} OR {@link
AccumuloInputFormat#setMockInstance(Job, String)}
  * </ul>
  * 
- * Other static methods are optional
+ * Other static methods are optional.
  */
-
 public class AccumuloInputFormat extends InputFormatBase<Key,Value> {
   @Override
   public RecordReader<Key,Value> createRecordReader(InputSplit split, TaskAttemptContext
context) throws IOException, InterruptedException {
-    log.setLevel(getLogLevel(context.getConfiguration()));
+    log.setLevel(getLogLevel(context));
     return new RecordReaderBase<Key,Value>() {
       @Override
       public boolean nextKeyValue() throws IOException, InterruptedException {

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java?rev=1437726&r1=1437725&r2=1437726&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
(original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
Wed Jan 23 20:51:59 2013
@@ -34,16 +34,18 @@ import org.apache.accumulo.core.client.M
 import org.apache.accumulo.core.client.TableExistsException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.mapreduce.util.OutputConfigurator;
 import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.data.ColumnUpdate;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.accumulo.core.security.thrift.SecurityErrorCode;
-import org.apache.accumulo.core.util.ArgumentChecker;
-import org.apache.commons.codec.binary.Base64;
+import org.apache.accumulo.core.security.tokens.AccumuloToken;
+import org.apache.accumulo.core.security.tokens.UserPassToken;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
@@ -54,182 +56,245 @@ import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
 /**
- * This class allows MapReduce jobs to use Accumulo as the sink of data. This output format
accepts keys and values of type Text (for a table name) and Mutation
- * from the Map() and Reduce() functions.
+ * This class allows MapReduce jobs to use Accumulo as the sink for data. This {@link OutputFormat}
accepts keys and values of type {@link Text} (for a table
+ * name) and {@link Mutation} from the Map and Reduce functions.
  * 
- * The user must specify the following via static methods:
+ * The user must specify the following via static configurator methods:
  * 
  * <ul>
- * <li>AccumuloOutputFormat.setOutputInfo(job, username, password, createTables, defaultTableName)
- * <li>AccumuloOutputFormat.setZooKeeperInstance(job, instanceName, hosts)
+ * <li>{@link AccumuloOutputFormat#setConnectorInfo(Job, String, byte[])}
+ * <li>{@link AccumuloOutputFormat#setZooKeeperInstance(Job, String, String)} OR {@link
AccumuloOutputFormat#setMockInstance(Job, String)}
  * </ul>
  * 
- * Other static methods are optional
+ * Other static methods are optional.
  */
 public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
-  private static final Logger log = Logger.getLogger(AccumuloOutputFormat.class);
   
-  private static final String PREFIX = AccumuloOutputFormat.class.getSimpleName();
-  private static final String OUTPUT_INFO_HAS_BEEN_SET = PREFIX + ".configured";
-  private static final String INSTANCE_HAS_BEEN_SET = PREFIX + ".instanceConfigured";
-  private static final String USERNAME = PREFIX + ".username";
-  private static final String PASSWORD = PREFIX + ".password";
-  private static final String DEFAULT_TABLE_NAME = PREFIX + ".defaulttable";
-  
-  private static final String INSTANCE_NAME = PREFIX + ".instanceName";
-  private static final String ZOOKEEPERS = PREFIX + ".zooKeepers";
-  private static final String MOCK = ".useMockInstance";
-  
-  private static final String CREATETABLES = PREFIX + ".createtables";
-  private static final String LOGLEVEL = PREFIX + ".loglevel";
-  private static final String SIMULATE = PREFIX + ".simulate";
-  
-  // BatchWriter options
-  private static final String MAX_MUTATION_BUFFER_SIZE = PREFIX + ".maxmemory";
-  private static final String MAX_LATENCY = PREFIX + ".maxlatency";
-  private static final String NUM_WRITE_THREADS = PREFIX + ".writethreads";
-  private static final String TIMEOUT = PREFIX + ".timeout";
-  
-  private static final long DEFAULT_MAX_MUTATION_BUFFER_SIZE = 50 * 1024 * 1024; // 50MB
-  private static final int DEFAULT_MAX_LATENCY = 60 * 1000; // 1 minute
-  private static final int DEFAULT_NUM_WRITE_THREADS = 2;
-  
-  /**
-   * Configure the output format.
-   * 
-   * @param conf
-   *          the Map/Reduce job object
-   * @param user
-   *          the username, which must have the Table.CREATE permission to create tables
-   * @param passwd
-   *          the passwd for the username
-   * @param createTables
-   *          the output format will create new tables as necessary. Table names can only
be alpha-numeric and underscores.
-   * @param defaultTable
-   *          the table to use when the tablename is null in the write call
-   */
-  public static void setOutputInfo(Configuration conf, String user, byte[] passwd, boolean
createTables, String defaultTable) {
-    if (conf.getBoolean(OUTPUT_INFO_HAS_BEEN_SET, false))
-      throw new IllegalStateException("Output info can only be set once per job");
-    conf.setBoolean(OUTPUT_INFO_HAS_BEEN_SET, true);
-    
-    ArgumentChecker.notNull(user, passwd);
-    conf.set(USERNAME, user);
-    conf.set(PASSWORD, new String(Base64.encodeBase64(passwd)));
-    conf.setBoolean(CREATETABLES, createTables);
-    if (defaultTable != null)
-      conf.set(DEFAULT_TABLE_NAME, defaultTable);
-  }
-  
-  public static void setZooKeeperInstance(Configuration conf, String instanceName, String
zooKeepers) {
-    if (conf.getBoolean(INSTANCE_HAS_BEEN_SET, false))
-      throw new IllegalStateException("Instance info can only be set once per job");
-    conf.setBoolean(INSTANCE_HAS_BEEN_SET, true);
-    ArgumentChecker.notNull(instanceName, zooKeepers);
-    conf.set(INSTANCE_NAME, instanceName);
-    conf.set(ZOOKEEPERS, zooKeepers);
-    System.out.println("instance set: " + conf.get(INSTANCE_HAS_BEEN_SET));
-  }
-  
-  public static void setMockInstance(Configuration conf, String instanceName) {
-    conf.setBoolean(INSTANCE_HAS_BEEN_SET, true);
-    conf.setBoolean(MOCK, true);
-    conf.set(INSTANCE_NAME, instanceName);
-  }
+  private static final Class<?> CLASS = AccumuloOutputFormat.class;
+  protected static final Logger log = Logger.getLogger(CLASS);
   
   /**
-   * see {@link BatchWriterConfig#setMaxMemory(long)}
+   * Sets the connector information needed to communicate with Accumulo in this job.
+   * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param token
+   *          a valid AccumuloToken (principal must have Table.CREATE permission if {@link
#setCreateTables(Job, boolean)} is set to true)
+   * @since 1.5.0
    */
-  
-  public static void setMaxMutationBufferSize(Configuration conf, long numberOfBytes) {
-    conf.setLong(MAX_MUTATION_BUFFER_SIZE, numberOfBytes);
+  public static void setConnectorInfo(Job job, AccumuloToken<?,?> token) {
+    OutputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), token);
   }
   
   /**
-   * see {@link BatchWriterConfig#setMaxLatency(long, TimeUnit)}
+   * Determines if the connector has been configured.
+   * 
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return true if the connector has been configured, false otherwise
+   * @since 1.5.0
+   * @see #setConnectorInfo(Job, String, byte[])
    */
-  
-  public static void setMaxLatency(Configuration conf, int numberOfMilliseconds) {
-    conf.setInt(MAX_LATENCY, numberOfMilliseconds);
+  protected static Boolean isConnectorInfoSet(JobContext context) {
+    return OutputConfigurator.isConnectorInfoSet(CLASS, context.getConfiguration());
   }
   
   /**
-   * see {@link BatchWriterConfig#setMaxWriteThreads(int)}
+   * Gets the user name from the configuration.
+   * 
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return the AccumuloToken
+   * @since 1.5.0
+   * @see #setConnectorInfo(Job, AccumuloToken)
    */
-  
-  public static void setMaxWriteThreads(Configuration conf, int numberOfThreads) {
-    conf.setInt(NUM_WRITE_THREADS, numberOfThreads);
+  protected static AccumuloToken<?,?> getToken(JobContext context) {
+    return OutputConfigurator.getToken(CLASS, context.getConfiguration());
   }
-  
+
   /**
-   * see {@link BatchWriterConfig#setTimeout(long, TimeUnit)}
+   * Configures a {@link ZooKeeperInstance} for this job.
+   * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param instanceName
+   *          the Accumulo instance name
+   * @param zooKeepers
+   *          a comma-separated list of zookeeper servers
+   * @since 1.5.0
    */
-  
-  public static void setTimeout(Configuration conf, long time, TimeUnit timeUnit) {
-    conf.setLong(TIMEOUT, timeUnit.toMillis(time));
-  }
-  
-  public static void setLogLevel(Configuration conf, Level level) {
-    ArgumentChecker.notNull(level);
-    conf.setInt(LOGLEVEL, level.toInt());
+  public static void setZooKeeperInstance(Job job, String instanceName, String zooKeepers)
{
+    OutputConfigurator.setZooKeeperInstance(CLASS, job.getConfiguration(), instanceName,
zooKeepers);
   }
   
-  public static void setSimulationMode(Configuration conf) {
-    conf.setBoolean(SIMULATE, true);
+  /**
+   * Configures a {@link MockInstance} for this job.
+   * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param instanceName
+   *          the Accumulo instance name
+   * @since 1.5.0
+   */
+  public static void setMockInstance(Job job, String instanceName) {
+    OutputConfigurator.setMockInstance(CLASS, job.getConfiguration(), instanceName);
   }
   
-  protected static String getUsername(Configuration conf) {
-    return conf.get(USERNAME);
+  /**
+   * Initializes an Accumulo {@link Instance} based on the configuration.
+   * 
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return an Accumulo instance
+   * @since 1.5.0
+   * @see #setZooKeeperInstance(Job, String, String)
+   * @see #setMockInstance(Job, String)
+   */
+  protected static Instance getInstance(JobContext context) {
+    return OutputConfigurator.getInstance(CLASS, context.getConfiguration());
   }
   
   /**
-   * WARNING: The password is stored in the Configuration and shared with all MapReduce tasks;
It is BASE64 encoded to provide a charset safe conversion to a
-   * string, and is not intended to be secure.
+   * Sets the log level for this job.
+   * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param level
+   *          the logging level
+   * @since 1.5.0
    */
-  protected static byte[] getPassword(Configuration conf) {
-    return Base64.decodeBase64(conf.get(PASSWORD, "").getBytes());
+  public static void setLogLevel(Job job, Level level) {
+    OutputConfigurator.setLogLevel(CLASS, job.getConfiguration(), level);
   }
   
-  protected static boolean canCreateTables(Configuration conf) {
-    return conf.getBoolean(CREATETABLES, false);
+  /**
+   * Gets the log level from this configuration.
+   * 
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return the log level
+   * @since 1.5.0
+   * @see #setLogLevel(Job, Level)
+   */
+  protected static Level getLogLevel(JobContext context) {
+    return OutputConfigurator.getLogLevel(CLASS, context.getConfiguration());
   }
   
-  protected static String getDefaultTableName(Configuration conf) {
-    return conf.get(DEFAULT_TABLE_NAME);
+  /**
+   * Sets the default table name to use if one emits a null in place of a table name for
a given mutation. Table names can only be alpha-numeric and
+   * underscores.
+   * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param tableName
+   *          the table to use when the tablename is null in the write call
+   * @since 1.5.0
+   */
+  public static void setDefaultTableName(Job job, String tableName) {
+    OutputConfigurator.setDefaultTableName(CLASS, job.getConfiguration(), tableName);
   }
   
-  protected static Instance getInstance(Configuration conf) {
-    if (conf.getBoolean(MOCK, false))
-      return new MockInstance(conf.get(INSTANCE_NAME));
-    return new ZooKeeperInstance(conf.get(INSTANCE_NAME), conf.get(ZOOKEEPERS));
+  /**
+   * Gets the default table name from the configuration.
+   * 
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return the default table name
+   * @since 1.5.0
+   * @see #setDefaultTableName(Job, String)
+   */
+  protected static String getDefaultTableName(JobContext context) {
+    return OutputConfigurator.getDefaultTableName(CLASS, context.getConfiguration());
   }
   
-  protected static long getMaxMutationBufferSize(Configuration conf) {
-    return conf.getLong(MAX_MUTATION_BUFFER_SIZE, DEFAULT_MAX_MUTATION_BUFFER_SIZE);
+  /**
+   * Sets the configuration for for the job's {@link BatchWriter} instances. If not set,
a new {@link BatchWriterConfig}, with sensible built-in defaults is
+   * used. Setting the configuration multiple times overwrites any previous configuration.
+   * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param bwConfig
+   *          the configuration for the {@link BatchWriter}
+   * @since 1.5.0
+   */
+  public static void setBatchWriterOptions(Job job, BatchWriterConfig bwConfig) {
+    OutputConfigurator.setBatchWriterOptions(CLASS, job.getConfiguration(), bwConfig);
   }
   
-  protected static int getMaxLatency(Configuration conf) {
-    return conf.getInt(MAX_LATENCY, DEFAULT_MAX_LATENCY);
+  /**
+   * Gets the {@link BatchWriterConfig} settings.
+   * 
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return the configuration object
+   * @since 1.5.0
+   * @see #setBatchWriterOptions(Job, BatchWriterConfig)
+   */
+  protected static BatchWriterConfig getBatchWriterOptions(JobContext context) {
+    return OutputConfigurator.getBatchWriterOptions(CLASS, context.getConfiguration());
   }
   
-  protected static int getMaxWriteThreads(Configuration conf) {
-    return conf.getInt(NUM_WRITE_THREADS, DEFAULT_NUM_WRITE_THREADS);
+  /**
+   * Sets the directive to create new tables, as necessary. Table names can only be alpha-numeric
and underscores.
+   * 
+   * <p>
+   * By default, this feature is <b>disabled</b>.
+   * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param enableFeature
+   *          the feature is enabled if true, disabled otherwise
+   * @since 1.5.0
+   */
+  public static void setCreateTables(Job job, boolean enableFeature) {
+    OutputConfigurator.setCreateTables(CLASS, job.getConfiguration(), enableFeature);
   }
   
-  protected static long getTimeout(Configuration conf) {
-    return conf.getLong(TIMEOUT, Long.MAX_VALUE);
+  /**
+   * Determines whether tables are permitted to be created as needed.
+   * 
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return true if the feature is disabled, false otherwise
+   * @since 1.5.0
+   * @see #setCreateTables(Job, boolean)
+   */
+  protected static Boolean canCreateTables(JobContext context) {
+    return OutputConfigurator.canCreateTables(CLASS, context.getConfiguration());
   }
   
-  protected static Level getLogLevel(Configuration conf) {
-    if (conf.get(LOGLEVEL) != null)
-      return Level.toLevel(conf.getInt(LOGLEVEL, Level.INFO.toInt()));
-    return null;
+  /**
+   * Sets the directive to use simulation mode for this job. In simulation mode, no output
is produced. This is useful for testing.
+   * 
+   * <p>
+   * By default, this feature is <b>disabled</b>.
+   * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param enableFeature
+   *          the feature is enabled if true, disabled otherwise
+   * @since 1.5.0
+   */
+  public static void setSimulationMode(Job job, boolean enableFeature) {
+    OutputConfigurator.setSimulationMode(CLASS, job.getConfiguration(), enableFeature);
   }
   
-  protected static boolean getSimulationMode(Configuration conf) {
-    return conf.getBoolean(SIMULATE, false);
+  /**
+   * Determines whether this feature is enabled.
+   * 
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return true if the feature is enabled, false otherwise
+   * @since 1.5.0
+   * @see #setSimulationMode(Job, boolean)
+   */
+  protected static Boolean getSimulationMode(JobContext context) {
+    return OutputConfigurator.getSimulationMode(CLASS, context.getConfiguration());
   }
   
+  /**
+   * A base class to be used to create {@link RecordWriter} instances that write to Accumulo.
+   */
   protected static class AccumuloRecordWriter extends RecordWriter<Text,Mutation> {
     private MultiTableBatchWriter mtbw = null;
     private HashMap<Text,BatchWriter> bws = null;
@@ -243,26 +308,24 @@ public class AccumuloOutputFormat extend
     
     private Connector conn;
     
-    protected AccumuloRecordWriter(Configuration conf) throws AccumuloException, AccumuloSecurityException,
IOException {
-      Level l = getLogLevel(conf);
+    protected AccumuloRecordWriter(TaskAttemptContext context) throws AccumuloException,
AccumuloSecurityException, IOException {
+      Level l = getLogLevel(context);
       if (l != null)
-        log.setLevel(getLogLevel(conf));
-      this.simulate = getSimulationMode(conf);
-      this.createTables = canCreateTables(conf);
+        log.setLevel(getLogLevel(context));
+      this.simulate = getSimulationMode(context);
+      this.createTables = canCreateTables(context);
       
       if (simulate)
         log.info("Simulating output only. No writes to tables will occur");
       
       this.bws = new HashMap<Text,BatchWriter>();
       
-      String tname = getDefaultTableName(conf);
+      String tname = getDefaultTableName(context);
       this.defaultTableName = (tname == null) ? null : new Text(tname);
       
       if (!simulate) {
-        this.conn = getInstance(conf).getConnector(getUsername(conf), getPassword(conf));
-        mtbw = conn.createMultiTableBatchWriter(new BatchWriterConfig().setMaxMemory(getMaxMutationBufferSize(conf))
-            .setMaxLatency(getMaxLatency(conf), TimeUnit.MILLISECONDS).setMaxWriteThreads(getMaxWriteThreads(conf))
-            .setTimeout(getTimeout(conf), TimeUnit.MILLISECONDS));
+        this.conn = getInstance(context).getConnector(getToken(context));
+        mtbw = conn.createMultiTableBatchWriter(getBatchWriterOptions(context));
       }
     }
     
@@ -391,17 +454,12 @@ public class AccumuloOutputFormat extend
   
   @Override
   public void checkOutputSpecs(JobContext job) throws IOException {
-    checkOutputSpecs(job.getConfiguration());
-  }
-  
-  public void checkOutputSpecs(Configuration conf) throws IOException {
-    if (!conf.getBoolean(OUTPUT_INFO_HAS_BEEN_SET, false))
-      throw new IOException("Output info has not been set.");
-    if (!conf.getBoolean(INSTANCE_HAS_BEEN_SET, false))
-      throw new IOException("Instance info has not been set.");
+    if (!isConnectorInfoSet(job))
+      throw new IOException("Connector info has not been set.");
     try {
-      Connector c = getInstance(conf).getConnector(getUsername(conf), getPassword(conf));
-      if (!c.securityOperations().authenticateUser(getUsername(conf), getPassword(conf)))
+      // if the instance isn't configured, it will complain here
+      Connector c = getInstance(job).getConnector(getToken(job));
+      if (!c.securityOperations().authenticateUser(getToken(job)))
         throw new IOException("Unable to authenticate user");
     } catch (AccumuloException e) {
       throw new IOException(e);
@@ -418,9 +476,172 @@ public class AccumuloOutputFormat extend
   @Override
   public RecordWriter<Text,Mutation> getRecordWriter(TaskAttemptContext attempt) throws
IOException {
     try {
-      return new AccumuloRecordWriter(attempt.getConfiguration());
+      return new AccumuloRecordWriter(attempt);
     } catch (Exception e) {
       throw new IOException(e);
     }
   }
+  
+  // ----------------------------------------------------------------------------------------------------
+  // Everything below this line is deprecated and should go away in future versions
+  // ----------------------------------------------------------------------------------------------------
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #setConnectorInfo(Job, String, byte[])}, {@link
#setCreateTables(Job, boolean)}, and
+   *             {@link #setDefaultTableName(Job, String)} instead.
+   */
+  @Deprecated
+  public static void setOutputInfo(Configuration conf, String user, byte[] passwd, boolean
createTables, String defaultTable) {
+    OutputConfigurator.setConnectorInfo(CLASS, conf, new UserPassToken(user, passwd));
+    OutputConfigurator.setCreateTables(CLASS, conf, createTables);
+    OutputConfigurator.setDefaultTableName(CLASS, conf, defaultTable);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #setZooKeeperInstance(Job, String, String)} instead.
+   */
+  @Deprecated
+  public static void setZooKeeperInstance(Configuration conf, String instanceName, String
zooKeepers) {
+    OutputConfigurator.setZooKeeperInstance(CLASS, conf, instanceName, zooKeepers);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #setMockInstance(Job, String)} instead.
+   */
+  @Deprecated
+  public static void setMockInstance(Configuration conf, String instanceName) {
+    OutputConfigurator.setMockInstance(CLASS, conf, instanceName);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #setBatchWriterOptions(Job, BatchWriterConfig)}
instead.
+   */
+  @Deprecated
+  public static void setMaxMutationBufferSize(Configuration conf, long numberOfBytes) {
+    BatchWriterConfig bwConfig = OutputConfigurator.getBatchWriterOptions(CLASS, conf);
+    bwConfig.setMaxMemory(numberOfBytes);
+    OutputConfigurator.setBatchWriterOptions(CLASS, conf, bwConfig);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #setBatchWriterOptions(Job, BatchWriterConfig)}
instead.
+   */
+  @Deprecated
+  public static void setMaxLatency(Configuration conf, int numberOfMilliseconds) {
+    BatchWriterConfig bwConfig = OutputConfigurator.getBatchWriterOptions(CLASS, conf);
+    bwConfig.setMaxLatency(numberOfMilliseconds, TimeUnit.MILLISECONDS);
+    OutputConfigurator.setBatchWriterOptions(CLASS, conf, bwConfig);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #setBatchWriterOptions(Job, BatchWriterConfig)}
instead.
+   */
+  @Deprecated
+  public static void setMaxWriteThreads(Configuration conf, int numberOfThreads) {
+    BatchWriterConfig bwConfig = OutputConfigurator.getBatchWriterOptions(CLASS, conf);
+    bwConfig.setMaxWriteThreads(numberOfThreads);
+    OutputConfigurator.setBatchWriterOptions(CLASS, conf, bwConfig);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #setLogLevel(Job, Level)} instead.
+   */
+  @Deprecated
+  public static void setLogLevel(Configuration conf, Level level) {
+    OutputConfigurator.setLogLevel(CLASS, conf, level);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #setSimulationMode(Job, boolean)} instead.
+   */
+  @Deprecated
+  public static void setSimulationMode(Configuration conf) {
+    OutputConfigurator.setSimulationMode(CLASS, conf, true);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #getUsername(JobContext)} instead.
+   */
+  @Deprecated
+  protected static String getUsername(Configuration conf) {
+    return OutputConfigurator.getToken(CLASS, conf).getPrincipal();
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #getPassword(JobContext)} instead.
+   */
+  @Deprecated
+  protected static byte[] getPassword(Configuration conf) {
+    AccumuloToken<?,?> token = OutputConfigurator.getToken(CLASS, conf);
+    if (token instanceof UserPassToken) {
+      UserPassToken upt = (UserPassToken) token;
+      return upt.getPassword();
+    }
+    throw new RuntimeException("Not applicable for non-UserPassTokens");
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #canCreateTables(JobContext)} instead.
+   */
+  @Deprecated
+  protected static boolean canCreateTables(Configuration conf) {
+    return OutputConfigurator.canCreateTables(CLASS, conf);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #getDefaultTableName(JobContext)} instead.
+   */
+  @Deprecated
+  protected static String getDefaultTableName(Configuration conf) {
+    return OutputConfigurator.getDefaultTableName(CLASS, conf);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #getInstance(JobContext)} instead.
+   */
+  @Deprecated
+  protected static Instance getInstance(Configuration conf) {
+    return OutputConfigurator.getInstance(CLASS, conf);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #getBatchWriterOptions(JobContext)} instead.
+   */
+  @Deprecated
+  protected static long getMaxMutationBufferSize(Configuration conf) {
+    return OutputConfigurator.getBatchWriterOptions(CLASS, conf).getMaxMemory();
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #getBatchWriterOptions(JobContext)} instead.
+   */
+  @Deprecated
+  protected static int getMaxLatency(Configuration conf) {
+    return (int) OutputConfigurator.getBatchWriterOptions(CLASS, conf).getMaxLatency(TimeUnit.MILLISECONDS);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #getBatchWriterOptions(JobContext)} instead.
+   */
+  @Deprecated
+  protected static int getMaxWriteThreads(Configuration conf) {
+    return OutputConfigurator.getBatchWriterOptions(CLASS, conf).getMaxWriteThreads();
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #getLogLevel(JobContext)} instead.
+   */
+  @Deprecated
+  protected static Level getLogLevel(Configuration conf) {
+    return OutputConfigurator.getLogLevel(CLASS, conf);
+  }
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #getSimulationMode(JobContext)} instead.
+   */
+  @Deprecated
+  protected static boolean getSimulationMode(Configuration conf) {
+    return OutputConfigurator.getSimulationMode(CLASS, conf);
+  }
+  
 }

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java?rev=1437726&r1=1437725&r2=1437726&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java
(original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java
Wed Jan 23 20:51:59 2013
@@ -22,12 +22,30 @@ import java.util.Map.Entry;
 import org.apache.accumulo.core.client.RowIterator;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.PeekingIterator;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
+/**
+ * This class allows MapReduce jobs to use Accumulo as the source of data. This {@link InputFormat}
provides row names as {@link Text} as keys, and a
+ * corresponding {@link PeekingIterator} as a value, which in turn makes the {@link Key}/{@link
Value} pairs for that row available to the Map function.
+ * 
+ * The user must specify the following via static configurator methods:
+ * 
+ * <ul>
+ * <li>{@link AccumuloRowInputFormat#setConnectorInfo(Job, String, byte[])}
+ * <li>{@link AccumuloRowInputFormat#setInputTableName(Job, String)}
+ * <li>{@link AccumuloRowInputFormat#setScanAuthorizations(Job, Authorizations)}
+ * <li>{@link AccumuloRowInputFormat#setZooKeeperInstance(Job, String, String)} OR
{@link AccumuloRowInputFormat#setMockInstance(Job, String)}
+ * </ul>
+ * 
+ * Other static methods are optional.
+ */
 public class AccumuloRowInputFormat extends InputFormatBase<Text,PeekingIterator<Entry<Key,Value>>>
{
   @Override
   public RecordReader<Text,PeekingIterator<Entry<Key,Value>>> createRecordReader(InputSplit
split, TaskAttemptContext context) throws IOException,



Mime
View raw message