accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [15/21] git commit: Merge branch '1.4.5-SNAPSHOT' into 1.5.1-SNAPSHOT
Date Fri, 24 Jan 2014 02:15:48 GMT
Merge branch '1.4.5-SNAPSHOT' into 1.5.1-SNAPSHOT

Conflicts:
	core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
	core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
	test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java
	test/system/continuous/run-verify.sh


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/23bb4321
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/23bb4321
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/23bb4321

Branch: refs/heads/master
Commit: 23bb43210afcf7fa57ad1fba19ec62f228831908
Parents: a147acd 36cec4f
Author: Josh Elser <elserj@apache.org>
Authored: Thu Jan 23 19:07:16 2014 -0500
Committer: Josh Elser <elserj@apache.org>
Committed: Thu Jan 23 21:00:49 2014 -0500

----------------------------------------------------------------------
 .../core/client/impl/OfflineScanner.java        |   17 +-
 .../core/client/mapreduce/RangeInputSplit.java  |   13 +-
 .../mapreduce/lib/util/ConfiguratorBase.java    |    8 +-
 .../core/client/mapreduce/InputFormatBase.java  | 1634 ------------------
 .../test/continuous/ContinuousVerify.java       |   21 -
 test/system/continuous/run-verify.sh            |   24 +-
 6 files changed, 15 insertions(+), 1702 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/23bb4321/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
index 3a49608,0000000..5f3069a
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
@@@ -1,411 -1,0 +1,416 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.client.impl;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.List;
++import java.util.Map;
 +import java.util.Map.Entry;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.AccumuloException;
++import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.RowIterator;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
++import org.apache.accumulo.core.conf.ConfigurationCopy;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Column;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.data.KeyValue;
 +import org.apache.accumulo.core.data.PartialKey;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.file.FileOperations;
 +import org.apache.accumulo.core.file.FileSKVIterator;
 +import org.apache.accumulo.core.file.FileUtil;
 +import org.apache.accumulo.core.iterators.IteratorEnvironment;
 +import org.apache.accumulo.core.iterators.IteratorUtil;
 +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 +import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 +import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
 +import org.apache.accumulo.core.iterators.system.ColumnQualifierFilter;
 +import org.apache.accumulo.core.iterators.system.DeletingIterator;
 +import org.apache.accumulo.core.iterators.system.MultiIterator;
 +import org.apache.accumulo.core.iterators.system.VisibilityFilter;
 +import org.apache.accumulo.core.master.state.tables.TableState;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.security.ColumnVisibility;
 +import org.apache.accumulo.core.security.CredentialHelper;
 +import org.apache.accumulo.core.security.thrift.TCredentials;
 +import org.apache.accumulo.core.util.ArgumentChecker;
 +import org.apache.accumulo.core.util.CachedConfiguration;
 +import org.apache.accumulo.core.util.LocalityGroupUtil;
 +import org.apache.accumulo.core.util.Pair;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.commons.lang.NotImplementedException;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.io.Text;
 +
 +class OfflineIterator implements Iterator<Entry<Key,Value>> {
 +  
 +  static class OfflineIteratorEnvironment implements IteratorEnvironment {
 +    @Override
 +    public SortedKeyValueIterator<Key,Value> reserveMapFileReader(String mapFileName)
throws IOException {
 +      throw new NotImplementedException();
 +    }
 +    
 +    @Override
 +    public AccumuloConfiguration getConfig() {
 +      return AccumuloConfiguration.getDefaultConfiguration();
 +    }
 +    
 +    @Override
 +    public IteratorScope getIteratorScope() {
 +      return IteratorScope.scan;
 +    }
 +    
 +    @Override
 +    public boolean isFullMajorCompaction() {
 +      return false;
 +    }
 +    
 +    private ArrayList<SortedKeyValueIterator<Key,Value>> topLevelIterators =
new ArrayList<SortedKeyValueIterator<Key,Value>>();
 +    
 +    @Override
 +    public void registerSideChannel(SortedKeyValueIterator<Key,Value> iter) {
 +      topLevelIterators.add(iter);
 +    }
 +    
 +    SortedKeyValueIterator<Key,Value> getTopLevelIterator(SortedKeyValueIterator<Key,Value>
iter) {
 +      if (topLevelIterators.isEmpty())
 +        return iter;
 +      ArrayList<SortedKeyValueIterator<Key,Value>> allIters = new ArrayList<SortedKeyValueIterator<Key,Value>>(topLevelIterators);
 +      allIters.add(iter);
 +      return new MultiIterator(allIters, false);
 +    }
 +  }
 +  
 +  private SortedKeyValueIterator<Key,Value> iter;
 +  private Range range;
 +  private KeyExtent currentExtent;
 +  private Connector conn;
 +  private String tableId;
 +  private Authorizations authorizations;
 +  private Instance instance;
 +  private ScannerOptions options;
 +  private ArrayList<SortedKeyValueIterator<Key,Value>> readers;
-   
++  private AccumuloConfiguration config;
++
 +  /**
 +   * @param instance
 +   * @param credentials
 +   * @param authorizations
 +   * @param table
 +   */
 +  public OfflineIterator(ScannerOptions options, Instance instance, TCredentials credentials,
Authorizations authorizations, Text table, Range range) {
 +    this.options = new ScannerOptions(options);
 +    this.instance = instance;
 +    this.range = range;
 +    
 +    if (this.options.fetchedColumns.size() > 0) {
 +      this.range = range.bound(this.options.fetchedColumns.first(), this.options.fetchedColumns.last());
 +    }
 +    
 +    this.tableId = table.toString();
 +    this.authorizations = authorizations;
 +    this.readers = new ArrayList<SortedKeyValueIterator<Key,Value>>();
 +    
 +    try {
 +      conn = instance.getConnector(credentials.getPrincipal(), CredentialHelper.extractToken(credentials));
++      config = new ConfigurationCopy(conn.instanceOperations().getSiteConfiguration());
 +      nextTablet();
 +      
 +      while (iter != null && !iter.hasTop())
 +        nextTablet();
 +      
 +    } catch (Exception e) {
 +      throw new RuntimeException(e);
 +    }
 +  }
 +  
 +  @Override
 +  public boolean hasNext() {
 +    return iter != null && iter.hasTop();
 +  }
 +  
 +  @Override
 +  public Entry<Key,Value> next() {
 +    try {
 +      byte[] v = iter.getTopValue().get();
 +      // copy just like tablet server does, do this before calling next
 +      KeyValue ret = new KeyValue(new Key(iter.getTopKey()), Arrays.copyOf(v, v.length));
 +      
 +      iter.next();
 +      
 +      while (iter != null && !iter.hasTop())
 +        nextTablet();
 +      
 +      return ret;
 +    } catch (Exception e) {
 +      throw new RuntimeException(e);
 +    }
 +  }
 +  
 +  /**
 +   * @throws TableNotFoundException
 +   * @throws IOException
 +   * @throws AccumuloException
 +   * 
 +   */
 +  private void nextTablet() throws TableNotFoundException, AccumuloException, IOException
{
 +    
 +    Range nextRange = null;
 +    
 +    if (currentExtent == null) {
 +      Text startRow;
 +      
 +      if (range.getStartKey() != null)
 +        startRow = range.getStartKey().getRow();
 +      else
 +        startRow = new Text();
 +      
 +      nextRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(),
true, null, false);
 +    } else {
 +      
 +      if (currentExtent.getEndRow() == null) {
 +        iter = null;
 +        return;
 +      }
 +      
 +      if (range.afterEndKey(new Key(currentExtent.getEndRow()).followingKey(PartialKey.ROW)))
{
 +        iter = null;
 +        return;
 +      }
 +      
 +      nextRange = new Range(currentExtent.getMetadataEntry(), false, null, false);
 +    }
 +    
 +    List<String> relFiles = new ArrayList<String>();
 +    
 +    Pair<KeyExtent,String> eloc = getTabletFiles(nextRange, relFiles);
 +    
 +    while (eloc.getSecond() != null) {
 +      if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
 +        Tables.clearCache(instance);
 +        if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
 +          throw new AccumuloException("Table is online " + tableId + " cannot scan tablet
in offline mode " + eloc.getFirst());
 +        }
 +      }
 +      
 +      UtilWaitThread.sleep(250);
 +      
 +      eloc = getTabletFiles(nextRange, relFiles);
 +    }
 +    
 +    KeyExtent extent = eloc.getFirst();
 +    
 +    if (!extent.getTableId().toString().equals(tableId)) {
 +      throw new AccumuloException(" did not find tablets for table " + tableId + " " + extent);
 +    }
 +    
 +    if (currentExtent != null && !extent.isPreviousExtent(currentExtent))
 +      throw new AccumuloException(" " + currentExtent + " is not previous extent " + extent);
-     
-     String tablesDir = Constants.getTablesDir(instance.getConfiguration());
++
++    String tablesDir = Constants.getTablesDir(config);
 +    List<String> absFiles = new ArrayList<String>();
 +    for (String relPath : relFiles) {
 +      if (relPath.startsWith(".."))
 +        absFiles.add(tablesDir + relPath.substring(2));
 +      else
 +        absFiles.add(tablesDir + "/" + tableId + relPath);
 +    }
 +    
 +    iter = createIterator(extent, absFiles);
 +    iter.seek(range, LocalityGroupUtil.families(options.fetchedColumns), options.fetchedColumns.size()
== 0 ? false : true);
 +    currentExtent = extent;
 +    
 +  }
 +  
 +  private Pair<KeyExtent,String> getTabletFiles(Range nextRange, List<String>
relFiles) throws TableNotFoundException {
 +    Scanner scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
 +    scanner.setBatchSize(100);
 +    scanner.setRange(nextRange);
 +    
 +    RowIterator rowIter = new RowIterator(scanner);
 +    Iterator<Entry<Key,Value>> row = rowIter.next();
 +    
 +    KeyExtent extent = null;
 +    String location = null;
 +    
 +    while (row.hasNext()) {
 +      Entry<Key,Value> entry = row.next();
 +      Key key = entry.getKey();
 +      
 +      if (key.getColumnFamily().equals(Constants.METADATA_DATAFILE_COLUMN_FAMILY)) {
 +        relFiles.add(key.getColumnQualifier().toString());
 +      }
 +      
 +      if (key.getColumnFamily().equals(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY)
 +          || key.getColumnFamily().equals(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY))
{
 +        location = entry.getValue().toString();
 +      }
 +      
 +      if (Constants.METADATA_PREV_ROW_COLUMN.hasColumns(key)) {
 +        extent = new KeyExtent(key.getRow(), entry.getValue());
 +      }
 +      
 +    }
 +    return new Pair<KeyExtent,String>(extent, location);
 +  }
 +  
 +  /**
 +   * @param absFiles
 +   * @return
 +   * @throws AccumuloException
 +   * @throws TableNotFoundException
 +   * @throws IOException
 +   */
 +  private SortedKeyValueIterator<Key,Value> createIterator(KeyExtent extent, List<String>
absFiles) throws TableNotFoundException, AccumuloException,
 +      IOException {
 +    
 +    // TODO share code w/ tablet - ACCUMULO-1303
 +    AccumuloConfiguration acuTableConf = AccumuloConfiguration.getTableConfiguration(conn,
tableId);
 +    
 +    Configuration conf = CachedConfiguration.getInstance();
-     
-     FileSystem fs = FileUtil.getFileSystem(conf, instance.getConfiguration());
-     
++
++    FileSystem fs = FileUtil.getFileSystem(conf, config);
++
 +    for (SortedKeyValueIterator<Key,Value> reader : readers) {
 +      ((FileSKVIterator) reader).close();
 +    }
 +    
 +    readers.clear();
 +    
 +    // TODO need to close files - ACCUMULO-1303
 +    for (String file : absFiles) {
 +      FileSKVIterator reader = FileOperations.getInstance().openReader(file, false, fs,
conf, acuTableConf, null, null);
 +      readers.add(reader);
 +    }
 +    
 +    MultiIterator multiIter = new MultiIterator(readers, extent);
 +    
 +    OfflineIteratorEnvironment iterEnv = new OfflineIteratorEnvironment();
 +    
 +    DeletingIterator delIter = new DeletingIterator(multiIter, false);
 +    
 +    ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter);
 +    
 +    ColumnQualifierFilter colFilter = new ColumnQualifierFilter(cfsi, new HashSet<Column>(options.fetchedColumns));
 +    
 +    byte[] defaultSecurityLabel;
 +    
 +    ColumnVisibility cv = new ColumnVisibility(acuTableConf.get(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY));
 +    defaultSecurityLabel = cv.getExpression();
 +    
 +    VisibilityFilter visFilter = new VisibilityFilter(colFilter, authorizations, defaultSecurityLabel);
 +    
 +    return iterEnv.getTopLevelIterator(IteratorUtil.loadIterators(IteratorScope.scan, visFilter,
extent, acuTableConf, options.serverSideIteratorList,
 +        options.serverSideIteratorOptions, iterEnv, false));
 +  }
 +  
 +  @Override
 +  public void remove() {
 +    throw new UnsupportedOperationException();
 +  }
 +  
 +}
 +
 +/**
 + * 
 + */
 +public class OfflineScanner extends ScannerOptions implements Scanner {
 +  
 +  private int batchSize;
 +  private int timeOut;
 +  private Range range;
 +  
 +  private Instance instance;
 +  private TCredentials credentials;
 +  private Authorizations authorizations;
 +  private Text tableId;
 +  
 +  public OfflineScanner(Instance instance, TCredentials credentials, String tableId, Authorizations
authorizations) {
 +    ArgumentChecker.notNull(instance, credentials, tableId, authorizations);
 +    this.instance = instance;
 +    this.credentials = credentials;
 +    this.tableId = new Text(tableId);
 +    this.range = new Range((Key) null, (Key) null);
 +    
 +    this.authorizations = authorizations;
 +    
 +    this.batchSize = Constants.SCAN_BATCH_SIZE;
 +    this.timeOut = Integer.MAX_VALUE;
 +  }
 +  
 +  @Deprecated
 +  @Override
 +  public void setTimeOut(int timeOut) {
 +    this.timeOut = timeOut;
 +  }
 +  
 +  @Deprecated
 +  @Override
 +  public int getTimeOut() {
 +    return timeOut;
 +  }
 +  
 +  @Override
 +  public void setRange(Range range) {
 +    this.range = range;
 +  }
 +  
 +  @Override
 +  public Range getRange() {
 +    return range;
 +  }
 +  
 +  @Override
 +  public void setBatchSize(int size) {
 +    this.batchSize = size;
 +  }
 +  
 +  @Override
 +  public int getBatchSize() {
 +    return batchSize;
 +  }
 +  
 +  @Override
 +  public void enableIsolation() {
 +    
 +  }
 +  
 +  @Override
 +  public void disableIsolation() {
 +    
 +  }
 +  
 +  @Override
 +  public Iterator<Entry<Key,Value>> iterator() {
 +    return new OfflineIterator(this, instance, credentials, authorizations, tableId, range);
 +  }
 +  
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/23bb4321/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
index f7b2263,0000000..75f140b
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
@@@ -1,442 -1,0 +1,433 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.client.mapreduce;
 +
 +import java.io.DataInput;
 +import java.io.DataOutput;
 +import java.io.IOException;
 +import java.math.BigInteger;
 +import java.nio.charset.Charset;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.List;
 +import java.util.Set;
 +
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.ZooKeeperInstance;
 +import org.apache.accumulo.core.client.mapreduce.lib.util.InputConfigurator;
 +import org.apache.accumulo.core.client.mock.MockInstance;
 +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
- import org.apache.accumulo.core.conf.AccumuloConfiguration;
- import org.apache.accumulo.core.conf.SiteConfiguration;
 +import org.apache.accumulo.core.data.ByteSequence;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.PartialKey;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.security.CredentialHelper;
 +import org.apache.accumulo.core.util.Pair;
 +import org.apache.commons.codec.binary.Base64;
 +import org.apache.hadoop.io.Text;
 +import org.apache.hadoop.io.Writable;
 +import org.apache.hadoop.mapreduce.InputSplit;
 +import org.apache.log4j.Level;
 +
 +/**
 + * The Class RangeInputSplit. Encapsulates an Accumulo range for use in Map Reduce jobs.
 + */
 +public class RangeInputSplit extends InputSplit implements Writable {
 +  private Range range;
 +  private String[] locations;
 +  private String table, instanceName, zooKeepers, principal;
 +  private AuthenticationToken token;
 +  private Boolean offline, mockInstance, isolatedScan, localIterators;
 +  private Authorizations auths;
 +  private Set<Pair<Text,Text>> fetchedColumns;
 +  private List<IteratorSetting> iterators;
 +  private Level level;
 +
 +  public RangeInputSplit() {
 +    range = new Range();
 +    locations = new String[0];
 +  }
 +
 +  public RangeInputSplit(Range range, String[] locations) {
 +    this.range = range;
 +    this.locations = locations;
 +  }
 +
 +  public Range getRange() {
 +    return range;
 +  }
 +
 +  private static byte[] extractBytes(ByteSequence seq, int numBytes) {
 +    byte[] bytes = new byte[numBytes + 1];
 +    bytes[0] = 0;
 +    for (int i = 0; i < numBytes; i++) {
 +      if (i >= seq.length())
 +        bytes[i + 1] = 0;
 +      else
 +        bytes[i + 1] = seq.byteAt(i);
 +    }
 +    return bytes;
 +  }
 +
 +  public static float getProgress(ByteSequence start, ByteSequence end, ByteSequence position)
{
 +    int maxDepth = Math.min(Math.max(end.length(), start.length()), position.length());
 +    BigInteger startBI = new BigInteger(extractBytes(start, maxDepth));
 +    BigInteger endBI = new BigInteger(extractBytes(end, maxDepth));
 +    BigInteger positionBI = new BigInteger(extractBytes(position, maxDepth));
 +    return (float) (positionBI.subtract(startBI).doubleValue() / endBI.subtract(startBI).doubleValue());
 +  }
 +
 +  public float getProgress(Key currentKey) {
 +    if (currentKey == null)
 +      return 0f;
 +    if (range.getStartKey() != null && range.getEndKey() != null) {
 +      if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW) != 0) {
 +        // just look at the row progress
 +        return getProgress(range.getStartKey().getRowData(), range.getEndKey().getRowData(),
currentKey.getRowData());
 +      } else if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW_COLFAM)
!= 0) {
 +        // just look at the column family progress
 +        return getProgress(range.getStartKey().getColumnFamilyData(), range.getEndKey().getColumnFamilyData(),
currentKey.getColumnFamilyData());
 +      } else if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW_COLFAM_COLQUAL)
!= 0) {
 +        // just look at the column qualifier progress
 +        return getProgress(range.getStartKey().getColumnQualifierData(), range.getEndKey().getColumnQualifierData(),
currentKey.getColumnQualifierData());
 +      }
 +    }
 +    // if we can't figure it out, then claim no progress
 +    return 0f;
 +  }
 +
 +  /**
 +   * This implementation of length is only an estimate, it does not provide exact values.
Do not have your code rely on this return value.
 +   */
 +  @Override
 +  public long getLength() throws IOException {
 +    Text startRow = range.isInfiniteStartKey() ? new Text(new byte[] {Byte.MIN_VALUE}) :
range.getStartKey().getRow();
 +    Text stopRow = range.isInfiniteStopKey() ? new Text(new byte[] {Byte.MAX_VALUE}) : range.getEndKey().getRow();
 +    int maxCommon = Math.min(7, Math.min(startRow.getLength(), stopRow.getLength()));
 +    long diff = 0;
 +
 +    byte[] start = startRow.getBytes();
 +    byte[] stop = stopRow.getBytes();
 +    for (int i = 0; i < maxCommon; ++i) {
 +      diff |= 0xff & (start[i] ^ stop[i]);
 +      diff <<= Byte.SIZE;
 +    }
 +
 +    if (startRow.getLength() != stopRow.getLength())
 +      diff |= 0xff;
 +
 +    return diff + 1;
 +  }
 +
 +  @Override
 +  public String[] getLocations() throws IOException {
 +    return locations;
 +  }
 +
 +  @Override
 +  public void readFields(DataInput in) throws IOException {
 +    range.readFields(in);
 +    int numLocs = in.readInt();
 +    locations = new String[numLocs];
 +    for (int i = 0; i < numLocs; ++i)
 +      locations[i] = in.readUTF();
 +
 +    if (in.readBoolean()) {
 +      isolatedScan = in.readBoolean();
 +    }
 +
 +    if (in.readBoolean()) {
 +      offline = in.readBoolean();
 +    }
 +
 +    if (in.readBoolean()) {
 +      localIterators = in.readBoolean();
 +    }
 +
 +    if (in.readBoolean()) {
 +      mockInstance = in.readBoolean();
 +    }
 +
 +    if (in.readBoolean()) {
 +      int numColumns = in.readInt();
 +      List<String> columns = new ArrayList<String>(numColumns);
 +      for (int i = 0; i < numColumns; i++) {
 +        columns.add(in.readUTF());
 +      }
 +
 +      fetchedColumns = InputConfigurator.deserializeFetchedColumns(columns);
 +    }
 +
 +    if (in.readBoolean()) {
 +      String strAuths = in.readUTF();
 +      auths = new Authorizations(strAuths.getBytes(Charset.forName("UTF-8")));
 +    }
 +
 +    if (in.readBoolean()) {
 +      principal = in.readUTF();
 +    }
 +
 +    if (in.readBoolean()) {
 +      String tokenClass = in.readUTF();
 +      byte[] base64TokenBytes = in.readUTF().getBytes(Charset.forName("UTF-8"));
 +      byte[] tokenBytes = Base64.decodeBase64(base64TokenBytes);
 +
 +      try {
 +        token = CredentialHelper.extractToken(tokenClass, tokenBytes);
 +      } catch (AccumuloSecurityException e) {
 +        throw new IOException(e);
 +      }
 +    }
 +
 +    if (in.readBoolean()) {
 +      instanceName = in.readUTF();
 +    }
 +
 +    if (in.readBoolean()) {
 +      zooKeepers = in.readUTF();
 +    }
 +
 +    if (in.readBoolean()) {
 +      level = Level.toLevel(in.readInt());
 +    }
 +  }
 +
 +  @Override
 +  public void write(DataOutput out) throws IOException {
 +    range.write(out);
 +    out.writeInt(locations.length);
 +    for (int i = 0; i < locations.length; ++i)
 +      out.writeUTF(locations[i]);
 +
 +    out.writeBoolean(null != isolatedScan);
 +    if (null != isolatedScan) {
 +      out.writeBoolean(isolatedScan);
 +    }
 +
 +    out.writeBoolean(null != offline);
 +    if (null != offline) {
 +      out.writeBoolean(offline);
 +    }
 +
 +    out.writeBoolean(null != localIterators);
 +    if (null != localIterators) {
 +      out.writeBoolean(localIterators);
 +    }
 +
 +    out.writeBoolean(null != mockInstance);
 +    if (null != mockInstance) {
 +      out.writeBoolean(mockInstance);
 +    }
 +
 +    out.writeBoolean(null != fetchedColumns);
 +    if (null != fetchedColumns) {
 +      String[] cols = InputConfigurator.serializeColumns(fetchedColumns);
 +      out.writeInt(cols.length);
 +      for (String col : cols) {
 +        out.writeUTF(col);
 +      }
 +    }
 +
 +    out.writeBoolean(null != auths);
 +    if (null != auths) {
 +      out.writeUTF(auths.serialize());
 +    }
 +
 +    out.writeBoolean(null != principal);
 +    if (null != principal) {
 +      out.writeUTF(principal);
 +    }
 +
 +    out.writeBoolean(null != token);
 +    if (null != token) {
 +      out.writeUTF(token.getClass().getCanonicalName());
 +      try {
 +        out.writeUTF(CredentialHelper.tokenAsBase64(token));
 +      } catch (AccumuloSecurityException e) {
 +        throw new IOException(e);
 +      }
 +    }
 +
 +    out.writeBoolean(null != instanceName);
 +    if (null != instanceName) {
 +      out.writeUTF(instanceName);
 +    }
 +
 +    out.writeBoolean(null != zooKeepers);
 +    if (null != zooKeepers) {
 +      out.writeUTF(zooKeepers);
 +    }
 +
 +    out.writeBoolean(null != level);
 +    if (null != level) {
 +      out.writeInt(level.toInt());
 +    }
 +  }
 +
 +  @Override
 +  public String toString() {
 +    StringBuilder sb = new StringBuilder(256);
 +    sb.append("Range: ").append(range);
 +    sb.append(" Locations: ").append(Arrays.asList(locations));
 +    sb.append(" Table: ").append(table);
 +    sb.append(" InstanceName: ").append(instanceName);
 +    sb.append(" zooKeepers: ").append(zooKeepers);
 +    sb.append(" principal: ").append(principal);
 +    sb.append(" authenticationToken: ").append(token);
 +    sb.append(" Authorizations: ").append(auths);
 +    sb.append(" offlineScan: ").append(offline);
 +    sb.append(" mockInstance: ").append(mockInstance);
 +    sb.append(" isolatedScan: ").append(isolatedScan);
 +    sb.append(" localIterators: ").append(localIterators);
 +    sb.append(" fetchColumns: ").append(fetchedColumns);
 +    sb.append(" iterators: ").append(iterators);
 +    sb.append(" logLevel: ").append(level);
 +    return sb.toString();
 +  }
 +
 +  public String getTable() {
 +    return table;
 +  }
 +
 +  public void setTable(String table) {
 +    this.table = table;
 +  }
 +
 +  public Instance getInstance() {
 +    if (null == instanceName) {
 +      return null;
 +    }
 +
 +    if (isMockInstance()) {
 +      return new MockInstance(getInstanceName());
 +    }
 +
 +    if (null == zooKeepers) {
 +      return null;
 +    }
- 
-     ZooKeeperInstance zki = new ZooKeeperInstance(getInstanceName(), getZooKeepers());
- 
-     // Wrap the DefaultConfiguration with a SiteConfiguration so we use accumulo-site.xml
-     // when it's present
-     AccumuloConfiguration xmlConfig = SiteConfiguration.getInstance(zki.getConfiguration());
-     zki.setConfiguration(xmlConfig);
- 
-     return zki;
++    
++    return new ZooKeeperInstance(getInstanceName(), getZooKeepers());
 +  }
 +
 +  public String getInstanceName() {
 +    return instanceName;
 +  }
 +
 +  public void setInstanceName(String instanceName) {
 +    this.instanceName = instanceName;
 +  }
 +
 +  public String getZooKeepers() {
 +    return zooKeepers;
 +  }
 +
 +  public void setZooKeepers(String zooKeepers) {
 +    this.zooKeepers = zooKeepers;
 +  }
 +
 +  public String getPrincipal() {
 +    return principal;
 +  }
 +
 +  public void setPrincipal(String principal) {
 +    this.principal = principal;
 +  }
 +
 +  public AuthenticationToken getToken() {
 +    return token;
 +  }
 +
 +  public void setToken(AuthenticationToken token) {
 +    this.token = token;
 +    ;
 +  }
 +
 +  public Boolean isOffline() {
 +    return offline;
 +  }
 +
 +  public void setOffline(Boolean offline) {
 +    this.offline = offline;
 +  }
 +
 +  public void setLocations(String[] locations) {
 +    this.locations = locations;
 +  }
 +
 +  public Boolean isMockInstance() {
 +    return mockInstance;
 +  }
 +
 +  public void setMockInstance(Boolean mockInstance) {
 +    this.mockInstance = mockInstance;
 +  }
 +
 +  public Boolean isIsolatedScan() {
 +    return isolatedScan;
 +  }
 +
 +  public void setIsolatedScan(Boolean isolatedScan) {
 +    this.isolatedScan = isolatedScan;
 +  }
 +
 +  public Authorizations getAuths() {
 +    return auths;
 +  }
 +
 +  public void setAuths(Authorizations auths) {
 +    this.auths = auths;
 +  }
 +
 +  public void setRange(Range range) {
 +    this.range = range;
 +  }
 +
 +  public Boolean usesLocalIterators() {
 +    return localIterators;
 +  }
 +
 +  public void setUsesLocalIterators(Boolean localIterators) {
 +    this.localIterators = localIterators;
 +  }
 +
 +  public Set<Pair<Text,Text>> getFetchedColumns() {
 +    return fetchedColumns;
 +  }
 +
 +  public void setFetchedColumns(Set<Pair<Text,Text>> fetchedColumns) {
 +    this.fetchedColumns = fetchedColumns;
 +  }
 +
 +  public List<IteratorSetting> getIterators() {
 +    return iterators;
 +  }
 +
 +  public void setIterators(List<IteratorSetting> iterators) {
 +    this.iterators = iterators;
 +  }
 +
 +  public Level getLogLevel() {
 +    return level;
 +  }
 +
 +  public void setLogLevel(Level level) {
 +    this.level = level;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/23bb4321/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/ConfiguratorBase.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/ConfiguratorBase.java
index a38aecf,0000000..b1ae3a5
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/ConfiguratorBase.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/ConfiguratorBase.java
@@@ -1,281 -1,0 +1,275 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.client.mapreduce.lib.util;
 +
 +import java.nio.charset.Charset;
 +
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.ZooKeeperInstance;
 +import org.apache.accumulo.core.client.mock.MockInstance;
 +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.SiteConfiguration;
 +import org.apache.accumulo.core.security.CredentialHelper;
 +import org.apache.accumulo.core.util.ArgumentChecker;
 +import org.apache.commons.codec.binary.Base64;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.util.StringUtils;
 +import org.apache.log4j.Level;
 +import org.apache.log4j.Logger;
 +
 +/**
 + * @since 1.5.0
 + */
 +public class ConfiguratorBase {
 +
 +  /**
 +   * Configuration keys for {@link Instance#getConnector(String, AuthenticationToken)}.
 +   * 
 +   * @since 1.5.0
 +   */
 +  public static enum ConnectorInfo {
 +    IS_CONFIGURED, PRINCIPAL, TOKEN, TOKEN_CLASS
 +  }
 +
 +  /**
 +   * Configuration keys for {@link Instance}, {@link ZooKeeperInstance}, and {@link MockInstance}.
 +   * 
 +   * @since 1.5.0
 +   */
 +  protected static enum InstanceOpts {
 +    TYPE, NAME, ZOO_KEEPERS;
 +  }
 +
 +  /**
 +   * Configuration keys for general configuration options.
 +   * 
 +   * @since 1.5.0
 +   */
 +  protected static enum GeneralOpts {
 +    LOG_LEVEL
 +  }
 +
 +  /**
 +   * Provides a configuration key for a given feature enum, prefixed by the implementingClass
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration
key
 +   * @param e
 +   *          the enum used to provide the unique part of the configuration key
 +   * @return the configuration key
 +   * @since 1.5.0
 +   */
 +  protected static String enumToConfKey(Class<?> implementingClass, Enum<?>
e) {
 +    return implementingClass.getSimpleName() + "." + e.getDeclaringClass().getSimpleName()
+ "." + StringUtils.camelize(e.name().toLowerCase());
 +  }
 +
 +  /**
 +   * Sets the connector information needed to communicate with Accumulo in this job.
 +   * 
 +   * <p>
 +   * <b>WARNING:</b> The serialized token is stored in the configuration and
shared with all MapReduce tasks. It is BASE64 encoded to provide a charset safe
 +   * conversion to a string, and is not intended to be secure.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration
key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @param principal
 +   *          a valid Accumulo user name
 +   * @param token
 +   *          the user's password
 +   * @throws AccumuloSecurityException
 +   * @since 1.5.0
 +   */
 +  public static void setConnectorInfo(Class<?> implementingClass, Configuration conf,
String principal, AuthenticationToken token)
 +      throws AccumuloSecurityException {
 +    if (isConnectorInfoSet(implementingClass, conf))
 +      throw new IllegalStateException("Connector info for " + implementingClass.getSimpleName()
+ " can only be set once per job");
 +
 +    ArgumentChecker.notNull(principal, token);
 +    conf.setBoolean(enumToConfKey(implementingClass, ConnectorInfo.IS_CONFIGURED), true);
 +    conf.set(enumToConfKey(implementingClass, ConnectorInfo.PRINCIPAL), principal);
 +    conf.set(enumToConfKey(implementingClass, ConnectorInfo.TOKEN_CLASS), token.getClass().getCanonicalName());
 +    conf.set(enumToConfKey(implementingClass, ConnectorInfo.TOKEN), CredentialHelper.tokenAsBase64(token));
 +  }
 +
 +  /**
 +   * Determines if the connector info has already been set for this instance.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration
key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @return true if the connector info has already been set, false otherwise
 +   * @since 1.5.0
 +   * @see #setConnectorInfo(Class, Configuration, String, AuthenticationToken)
 +   */
 +  public static Boolean isConnectorInfoSet(Class<?> implementingClass, Configuration
conf) {
 +    return conf.getBoolean(enumToConfKey(implementingClass, ConnectorInfo.IS_CONFIGURED),
false);
 +  }
 +
 +  /**
 +   * Gets the user name from the configuration.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration
key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @return the principal
 +   * @since 1.5.0
 +   * @see #setConnectorInfo(Class, Configuration, String, AuthenticationToken)
 +   */
 +  public static String getPrincipal(Class<?> implementingClass, Configuration conf)
{
 +    return conf.get(enumToConfKey(implementingClass, ConnectorInfo.PRINCIPAL));
 +  }
 +
 +  /**
 +   * Gets the serialized token class from the configuration.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration
key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @return the principal
 +   * @since 1.5.0
 +   * @see #setConnectorInfo(Class, Configuration, String, AuthenticationToken)
 +   */
 +  public static String getTokenClass(Class<?> implementingClass, Configuration conf)
{
 +    return conf.get(enumToConfKey(implementingClass, ConnectorInfo.TOKEN_CLASS));
 +  }
 +
 +  /**
 +   * Gets the password from the configuration. WARNING: The password is stored in the Configuration
and shared with all MapReduce tasks; It is BASE64 encoded to
 +   * provide a charset safe conversion to a string, and is not intended to be secure.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration
key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @return the decoded principal's authentication token
 +   * @since 1.5.0
 +   * @see #setConnectorInfo(Class, Configuration, String, AuthenticationToken)
 +   */
 +  public static byte[] getToken(Class<?> implementingClass, Configuration conf) {
 +    return Base64.decodeBase64(conf.get(enumToConfKey(implementingClass, ConnectorInfo.TOKEN),
"").getBytes(Charset.forName("UTF-8")));
 +  }
 +
 +  /**
 +   * Configures a {@link ZooKeeperInstance} for this job.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration
key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @param instanceName
 +   *          the Accumulo instance name
 +   * @param zooKeepers
 +   *          a comma-separated list of zookeeper servers
 +   * @since 1.5.0
 +   */
 +  public static void setZooKeeperInstance(Class<?> implementingClass, Configuration
conf, String instanceName, String zooKeepers) {
 +    String key = enumToConfKey(implementingClass, InstanceOpts.TYPE);
 +    if (!conf.get(key, "").isEmpty())
 +      throw new IllegalStateException("Instance info can only be set once per job; it has
already been configured with " + conf.get(key));
 +    conf.set(key, "ZooKeeperInstance");
 +
 +    ArgumentChecker.notNull(instanceName, zooKeepers);
 +    conf.set(enumToConfKey(implementingClass, InstanceOpts.NAME), instanceName);
 +    conf.set(enumToConfKey(implementingClass, InstanceOpts.ZOO_KEEPERS), zooKeepers);
 +  }
 +
 +  /**
 +   * Configures a {@link MockInstance} for this job.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration
key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @param instanceName
 +   *          the Accumulo instance name
 +   * @since 1.5.0
 +   */
 +  public static void setMockInstance(Class<?> implementingClass, Configuration conf,
String instanceName) {
 +    String key = enumToConfKey(implementingClass, InstanceOpts.TYPE);
 +    if (!conf.get(key, "").isEmpty())
 +      throw new IllegalStateException("Instance info can only be set once per job; it has
already been configured with " + conf.get(key));
 +    conf.set(key, "MockInstance");
 +
 +    ArgumentChecker.notNull(instanceName);
 +    conf.set(enumToConfKey(implementingClass, InstanceOpts.NAME), instanceName);
 +  }
 +
 +  /**
 +   * Initializes an Accumulo {@link Instance} based on the configuration.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration
key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @return an Accumulo instance
 +   * @since 1.5.0
 +   * @see #setZooKeeperInstance(Class, Configuration, String, String)
 +   * @see #setMockInstance(Class, Configuration, String)
 +   */
 +  public static Instance getInstance(Class<?> implementingClass, Configuration conf)
{
 +    String instanceType = conf.get(enumToConfKey(implementingClass, InstanceOpts.TYPE),
"");
 +    if ("MockInstance".equals(instanceType))
 +      return new MockInstance(conf.get(enumToConfKey(implementingClass, InstanceOpts.NAME)));
 +    else if ("ZooKeeperInstance".equals(instanceType)) {
-       ZooKeeperInstance zki = new ZooKeeperInstance(conf.get(enumToConfKey(implementingClass,
InstanceOpts.NAME)), conf.get(enumToConfKey(implementingClass,
++      return new ZooKeeperInstance(conf.get(enumToConfKey(implementingClass, InstanceOpts.NAME)),
conf.get(enumToConfKey(implementingClass,
 +          InstanceOpts.ZOO_KEEPERS)));
- 
-       // Wrap the DefaultConfiguration with a SiteConfiguration
-       AccumuloConfiguration xmlConfig = SiteConfiguration.getInstance(zki.getConfiguration());
-       zki.setConfiguration(xmlConfig);
- 
-       return zki;
 +    } else if (instanceType.isEmpty())
 +      throw new IllegalStateException("Instance has not been configured for " + implementingClass.getSimpleName());
 +    else
 +      throw new IllegalStateException("Unrecognized instance type " + instanceType);
 +  }
 +
 +  /**
 +   * Sets the log level for this job.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration
key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @param level
 +   *          the logging level
 +   * @since 1.5.0
 +   */
 +  public static void setLogLevel(Class<?> implementingClass, Configuration conf, Level
level) {
 +    ArgumentChecker.notNull(level);
 +    Logger.getLogger(implementingClass).setLevel(level);
 +    conf.setInt(enumToConfKey(implementingClass, GeneralOpts.LOG_LEVEL), level.toInt());
 +  }
 +
 +  /**
 +   * Gets the log level from this configuration.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration
key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @return the log level
 +   * @since 1.5.0
 +   * @see #setLogLevel(Class, Configuration, Level)
 +   */
 +  public static Level getLogLevel(Class<?> implementingClass, Configuration conf)
{
 +    return Level.toLevel(conf.getInt(enumToConfKey(implementingClass, GeneralOpts.LOG_LEVEL),
Level.INFO.toInt()));
 +  }
 +
 +}


Mime
View raw message