accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject [4/6] ACCUMULO-2408 organize static final members of abstract class to a concrete implementation
Date Wed, 26 Feb 2014 19:40:38 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0bd62e39/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
index 8a74bc7,0000000..ccee661
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
@@@ -1,448 -1,0 +1,448 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.client.impl;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.EnumMap;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.SortedMap;
 +import java.util.SortedSet;
 +
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.TableDeletedException;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.TableOfflineException;
 +import org.apache.accumulo.core.client.impl.TabletLocator.TabletLocation;
 +import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.data.Column;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.data.KeyValue;
 +import org.apache.accumulo.core.data.PartialKey;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.data.thrift.InitialScan;
 +import org.apache.accumulo.core.data.thrift.IterInfo;
 +import org.apache.accumulo.core.data.thrift.ScanResult;
 +import org.apache.accumulo.core.data.thrift.TKeyValue;
 +import org.apache.accumulo.core.master.state.tables.TableState;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.security.thrift.TCredentials;
 +import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
 +import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
 +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 +import org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException;
 +import org.apache.accumulo.core.util.OpTimer;
 +import org.apache.accumulo.core.util.ThriftUtil;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.trace.instrument.Span;
 +import org.apache.accumulo.trace.instrument.Trace;
 +import org.apache.accumulo.trace.instrument.Tracer;
 +import org.apache.accumulo.trace.thrift.TInfo;
 +import org.apache.hadoop.io.Text;
 +import org.apache.log4j.Level;
 +import org.apache.log4j.Logger;
 +import org.apache.thrift.TApplicationException;
 +import org.apache.thrift.TException;
 +import org.apache.thrift.TServiceClient;
 +
 +
 +public class ThriftScanner {
 +  private static final Logger log = Logger.getLogger(ThriftScanner.class);
 +  
 +  public static final Map<TabletType,Set<String>> serversWaitedForWrites = new EnumMap<TabletType,Set<String>>(TabletType.class);
 +  
 +  static {
 +    for (TabletType ttype : TabletType.values()) {
 +      serversWaitedForWrites.put(ttype, Collections.synchronizedSet(new HashSet<String>()));
 +    }
 +  }
 +  
 +  static boolean getBatchFromServer(TCredentials credentials, Range range, KeyExtent extent, String server, SortedMap<Key,Value> results,
 +      SortedSet<Column> fetchedColumns, List<IterInfo> serverSideIteratorList, Map<String,Map<String,String>> serverSideIteratorOptions, int size,
 +      Authorizations authorizations, boolean retry, AccumuloConfiguration conf) throws AccumuloException, AccumuloSecurityException, NotServingTabletException {
 +    if (server == null)
 +      throw new AccumuloException(new IOException());
 +    
 +    try {
 +      TInfo tinfo = Tracer.traceInfo();
 +      TabletClientService.Client client = ThriftUtil.getTServerClient(server, conf);
 +      try {
 +        // not reading whole rows (or stopping on row boundries) so there is no need to enable isolation below
 +        ScanState scanState = new ScanState(credentials, extent.getTableId(), authorizations, range, fetchedColumns, size, serverSideIteratorList,
 +            serverSideIteratorOptions, false);
 +        
 +        TabletType ttype = TabletType.type(extent);
 +        boolean waitForWrites = !serversWaitedForWrites.get(ttype).contains(server);
 +        InitialScan isr = client.startScan(tinfo, scanState.credentials, extent.toThrift(), scanState.range.toThrift(),
-             Translator.translate(scanState.columns, Translator.CT), scanState.size, scanState.serverSideIteratorList, scanState.serverSideIteratorOptions,
++            Translator.translate(scanState.columns, Translators.CT), scanState.size, scanState.serverSideIteratorList, scanState.serverSideIteratorOptions,
 +            scanState.authorizations.getAuthorizationsBB(), waitForWrites, scanState.isolated);
 +        if (waitForWrites)
 +          serversWaitedForWrites.get(ttype).add(server);
 +        
 +        Key.decompress(isr.result.results);
 +        
 +        for (TKeyValue kv : isr.result.results)
 +          results.put(new Key(kv.key), new Value(kv.value));
 +        
 +        client.closeScan(tinfo, isr.scanID);
 +        
 +        return isr.result.more;
 +      } finally {
 +        ThriftUtil.returnClient((TServiceClient) client);
 +      }
 +    } catch (TApplicationException tae) {
 +      throw new AccumuloServerException(server, tae);
 +    } catch (TooManyFilesException e) {
 +      log.debug("Tablet (" + extent + ") has too many files " + server + " : " + e);
 +    } catch (ThriftSecurityException e) {
 +      log.warn("Security Violation in scan request to " + server + ": " + e);
 +      throw new AccumuloSecurityException(e.user, e.code, e);
 +    } catch (TException e) {
 +      log.debug("Error getting transport to " + server + " : " + e);
 +    } 
 +    
 +    throw new AccumuloException("getBatchFromServer: failed");
 +  }
 +  
 +  public static class ScanState {
 +    
 +    boolean isolated;
 +    Text tableName;
 +    Text startRow;
 +    boolean skipStartRow;
 +    
 +    Range range;
 +    
 +    int size;
 +    
 +    TCredentials credentials;
 +    Authorizations authorizations;
 +    List<Column> columns;
 +    
 +    TabletLocation prevLoc;
 +    Long scanID;
 +    
 +    boolean finished = false;
 +    
 +    List<IterInfo> serverSideIteratorList;
 +    
 +    Map<String,Map<String,String>> serverSideIteratorOptions;
 +    
 +    public ScanState(TCredentials credentials, Text tableName, Authorizations authorizations, Range range, SortedSet<Column> fetchedColumns, int size,
 +        List<IterInfo> serverSideIteratorList, Map<String,Map<String,String>> serverSideIteratorOptions, boolean isolated) {
 +      this.credentials = credentials;
 +      this.authorizations = authorizations;
 +      
 +      columns = new ArrayList<Column>(fetchedColumns.size());
 +      for (Column column : fetchedColumns) {
 +        columns.add(column);
 +      }
 +      
 +      this.tableName = tableName;
 +      this.range = range;
 +      
 +      Key startKey = range.getStartKey();
 +      if (startKey == null) {
 +        startKey = new Key();
 +      }
 +      this.startRow = startKey.getRow();
 +      
 +      this.skipStartRow = false;
 +      
 +      this.size = size;
 +      
 +      this.serverSideIteratorList = serverSideIteratorList;
 +      this.serverSideIteratorOptions = serverSideIteratorOptions;
 +      
 +      this.isolated = isolated;
 +      
 +    }
 +  }
 +  
 +  public static class ScanTimedOutException extends IOException {
 +    
 +    private static final long serialVersionUID = 1L;
 +    
 +  }
 +  
 +  public static List<KeyValue> scan(Instance instance, TCredentials credentials, ScanState scanState, int timeOut, AccumuloConfiguration conf)
 +      throws ScanTimedOutException, AccumuloException, AccumuloSecurityException, TableNotFoundException {
 +    TabletLocation loc = null;
 +    
 +    long startTime = System.currentTimeMillis();
 +    String lastError = null;
 +    String error = null;
 +    int tooManyFilesCount = 0;
 +    
 +    List<KeyValue> results = null;
 +    
 +    Span span = Trace.start("scan");
 +    try {
 +      while (results == null && !scanState.finished) {
 +        
 +        if ((System.currentTimeMillis() - startTime) / 1000.0 > timeOut)
 +          throw new ScanTimedOutException();
 +        
 +        while (loc == null) {
 +          long currentTime = System.currentTimeMillis();
 +          if ((currentTime - startTime) / 1000.0 > timeOut)
 +            throw new ScanTimedOutException();
 +          
 +          Span locateSpan = Trace.start("scan:locateTablet");
 +          try {
 +            loc = TabletLocator.getInstance(instance, scanState.tableName).locateTablet(scanState.startRow, scanState.skipStartRow, false, credentials);
 +            if (loc == null) {
 +              if (!Tables.exists(instance, scanState.tableName.toString()))
 +                throw new TableDeletedException(scanState.tableName.toString());
 +              else if (Tables.getTableState(instance, scanState.tableName.toString()) == TableState.OFFLINE)
 +                throw new TableOfflineException(instance, scanState.tableName.toString());
 +              
 +              error = "Failed to locate tablet for table : " + scanState.tableName + " row : " + scanState.startRow;
 +              if (!error.equals(lastError))
 +                log.debug(error);
 +              else if (log.isTraceEnabled())
 +                log.trace(error);
 +              lastError = error;
 +              UtilWaitThread.sleep(100);
 +            } else {
 +              // when a tablet splits we do want to continue scanning the low child
 +              // of the split if we are already passed it
 +              Range dataRange = loc.tablet_extent.toDataRange();
 +              
 +              if (scanState.range.getStartKey() != null && dataRange.afterEndKey(scanState.range.getStartKey())) {
 +                // go to the next tablet
 +                scanState.startRow = loc.tablet_extent.getEndRow();
 +                scanState.skipStartRow = true;
 +                loc = null;
 +              } else if (scanState.range.getEndKey() != null && dataRange.beforeStartKey(scanState.range.getEndKey())) {
 +                // should not happen
 +                throw new RuntimeException("Unexpected tablet, extent : " + loc.tablet_extent + "  range : " + scanState.range + " startRow : "
 +                    + scanState.startRow);
 +              }
 +            }
 +          } catch (AccumuloServerException e) {
 +            log.debug("Scan failed, server side exception : " + e.getMessage());
 +            throw e;
 +          } catch (AccumuloException e) {
 +            error = "exception from tablet loc " + e.getMessage();
 +            if (!error.equals(lastError))
 +              log.debug(error);
 +            else if (log.isTraceEnabled())
 +              log.trace(error);
 +            
 +            lastError = error;
 +            UtilWaitThread.sleep(100);
 +          } finally {
 +            locateSpan.stop();
 +          }
 +        }
 +        
 +        Span scanLocation = Trace.start("scan:location");
 +        scanLocation.data("tserver", loc.tablet_location);
 +        try {
 +          results = scan(loc, scanState, conf);
 +        } catch (AccumuloSecurityException e) {
 +          Tables.clearCache(instance);
 +          if (!Tables.exists(instance, scanState.tableName.toString()))
 +            throw new TableDeletedException(scanState.tableName.toString());
 +          throw e;
 +        } catch (TApplicationException tae) {
 +          throw new AccumuloServerException(loc.tablet_location, tae);
 +        } catch (NotServingTabletException e) {
 +          error = "Scan failed, not serving tablet " + loc;
 +          if (!error.equals(lastError))
 +            log.debug(error);
 +          else if (log.isTraceEnabled())
 +            log.trace(error);
 +          lastError = error;
 +          
 +          TabletLocator.getInstance(instance, scanState.tableName).invalidateCache(loc.tablet_extent);
 +          loc = null;
 +          
 +          // no need to try the current scan id somewhere else
 +          scanState.scanID = null;
 +          
 +          if (scanState.isolated)
 +            throw new IsolationException();
 +          
 +          UtilWaitThread.sleep(100);
 +        } catch (NoSuchScanIDException e) {
 +          error = "Scan failed, no such scan id " + scanState.scanID + " " + loc;
 +          if (!error.equals(lastError))
 +            log.debug(error);
 +          else if (log.isTraceEnabled())
 +            log.trace(error);
 +          lastError = error;
 +          
 +          if (scanState.isolated)
 +            throw new IsolationException();
 +          
 +          scanState.scanID = null;
 +        } catch (TooManyFilesException e) {
 +          error = "Tablet has too many files " + loc + " retrying...";
 +          if (!error.equals(lastError)) {
 +            log.debug(error);
 +            tooManyFilesCount = 0;
 +          } else {
 +            tooManyFilesCount++;
 +            if (tooManyFilesCount == 300)
 +              log.warn(error);
 +            else if (log.isTraceEnabled())
 +              log.trace(error);
 +          }
 +          lastError = error;
 +          
 +          // not sure what state the scan session on the server side is
 +          // in after this occurs, so lets be cautious and start a new
 +          // scan session
 +          scanState.scanID = null;
 +          
 +          if (scanState.isolated)
 +            throw new IsolationException();
 +          
 +          UtilWaitThread.sleep(100);
 +        } catch (TException e) {
 +          TabletLocator.getInstance(instance, scanState.tableName).invalidateCache(loc.tablet_location);
 +          error = "Scan failed, thrift error " + e.getClass().getName() + "  " + e.getMessage() + " " + loc;
 +          if (!error.equals(lastError))
 +            log.debug(error);
 +          else if (log.isTraceEnabled())
 +            log.trace(error);
 +          lastError = error;
 +          loc = null;
 +          
 +          // do not want to continue using the same scan id, if a timeout occurred could cause a batch to be skipped
 +          // because a thread on the server side may still be processing the timed out continue scan
 +          scanState.scanID = null;
 +          
 +          if (scanState.isolated)
 +            throw new IsolationException();
 +          
 +          UtilWaitThread.sleep(100);
 +        } finally {
 +          scanLocation.stop();
 +        }
 +      }
 +      
 +      if (results != null && results.size() == 0 && scanState.finished) {
 +        results = null;
 +      }
 +      
 +      return results;
 +    } finally {
 +      span.stop();
 +    }
 +  }
 +  
 +  private static List<KeyValue> scan(TabletLocation loc, ScanState scanState, AccumuloConfiguration conf) throws AccumuloSecurityException,
 +      NotServingTabletException, TException, NoSuchScanIDException, TooManyFilesException {
 +    if (scanState.finished)
 +      return null;
 +    
 +    OpTimer opTimer = new OpTimer(log, Level.TRACE);
 +    
 +    TInfo tinfo = Tracer.traceInfo();
 +    TabletClientService.Client client = ThriftUtil.getTServerClient(loc.tablet_location, conf);
 +    
 +    String old = Thread.currentThread().getName();
 +    try {
 +      ScanResult sr;
 +      
 +      if (scanState.prevLoc != null && !scanState.prevLoc.equals(loc))
 +        scanState.scanID = null;
 +      
 +      scanState.prevLoc = loc;
 +      
 +      if (scanState.scanID == null) {
 +        String msg = "Starting scan tserver=" + loc.tablet_location + " tablet=" + loc.tablet_extent + " range=" + scanState.range + " ssil="
 +            + scanState.serverSideIteratorList + " ssio=" + scanState.serverSideIteratorOptions;
 +        Thread.currentThread().setName(msg);
 +        opTimer.start(msg);
 +        
 +        TabletType ttype = TabletType.type(loc.tablet_extent);
 +        boolean waitForWrites = !serversWaitedForWrites.get(ttype).contains(loc.tablet_location);
 +        InitialScan is = client.startScan(tinfo, scanState.credentials, loc.tablet_extent.toThrift(), scanState.range.toThrift(),
-             Translator.translate(scanState.columns, Translator.CT), scanState.size, scanState.serverSideIteratorList, scanState.serverSideIteratorOptions,
++            Translator.translate(scanState.columns, Translators.CT), scanState.size, scanState.serverSideIteratorList, scanState.serverSideIteratorOptions,
 +            scanState.authorizations.getAuthorizationsBB(), waitForWrites, scanState.isolated);
 +        if (waitForWrites)
 +          serversWaitedForWrites.get(ttype).add(loc.tablet_location);
 +        
 +        sr = is.result;
 +        
 +        if (sr.more)
 +          scanState.scanID = is.scanID;
 +        else
 +          client.closeScan(tinfo, is.scanID);
 +        
 +      } else {
 +        // log.debug("Calling continue scan : "+scanState.range+"  loc = "+loc);
 +        String msg = "Continuing scan tserver=" + loc.tablet_location + " scanid=" + scanState.scanID;
 +        Thread.currentThread().setName(msg);
 +        opTimer.start(msg);
 +        
 +        sr = client.continueScan(tinfo, scanState.scanID);
 +        if (!sr.more) {
 +          client.closeScan(tinfo, scanState.scanID);
 +          scanState.scanID = null;
 +        }
 +      }
 +      
 +      if (!sr.more) {
 +        // log.debug("No more : tab end row = "+loc.tablet_extent.getEndRow()+" range = "+scanState.range);
 +        if (loc.tablet_extent.getEndRow() == null) {
 +          scanState.finished = true;
 +          opTimer.stop("Completely finished scan in %DURATION% #results=" + sr.results.size());
 +        } else if (scanState.range.getEndKey() == null || !scanState.range.afterEndKey(new Key(loc.tablet_extent.getEndRow()).followingKey(PartialKey.ROW))) {
 +          scanState.startRow = loc.tablet_extent.getEndRow();
 +          scanState.skipStartRow = true;
 +          opTimer.stop("Finished scanning tablet in %DURATION% #results=" + sr.results.size());
 +        } else {
 +          scanState.finished = true;
 +          opTimer.stop("Completely finished scan in %DURATION% #results=" + sr.results.size());
 +        }
 +      } else {
 +        opTimer.stop("Finished scan in %DURATION% #results=" + sr.results.size() + " scanid=" + scanState.scanID);
 +      }
 +      
 +      Key.decompress(sr.results);
 +      
 +      if (sr.results.size() > 0 && !scanState.finished)
 +        scanState.range = new Range(new Key(sr.results.get(sr.results.size() - 1).key), false, scanState.range.getEndKey(), scanState.range.isEndKeyInclusive());
 +      
 +      List<KeyValue> results = new ArrayList<KeyValue>(sr.results.size());
 +      for (TKeyValue tkv : sr.results)
 +        results.add(new KeyValue(new Key(tkv.key), tkv.value));
 +      
 +      return results;
 +      
 +    } catch (ThriftSecurityException e) {
 +      throw new AccumuloSecurityException(e.user, e.code, e);
 +    } finally {
 +      ThriftUtil.returnClient((TServiceClient) client);
 +      Thread.currentThread().setName(old);
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0bd62e39/core/src/main/java/org/apache/accumulo/core/client/impl/Translator.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/Translator.java
index aa5a3be,0000000..103199b
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/Translator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/Translator.java
@@@ -1,141 -1,0 +1,132 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.client.impl;
 +
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +
 +import org.apache.accumulo.core.data.Column;
 +import org.apache.accumulo.core.data.ConstraintViolationSummary;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.thrift.TColumn;
 +import org.apache.accumulo.core.data.thrift.TConstraintViolationSummary;
 +import org.apache.accumulo.core.data.thrift.TKeyExtent;
 +import org.apache.accumulo.core.data.thrift.TRange;
 +
 +public abstract class Translator<IT,OT> {
 +  
 +  public abstract OT translate(IT input);
 +  
 +  public static class TKeyExtentTranslator extends Translator<TKeyExtent,KeyExtent> {
 +    @Override
 +    public KeyExtent translate(TKeyExtent input) {
 +      return new KeyExtent(input);
 +    }
 +    
 +  }
 +  
 +  public static class KeyExtentTranslator extends Translator<KeyExtent,TKeyExtent> {
 +    @Override
 +    public TKeyExtent translate(KeyExtent input) {
 +      return input.toThrift();
 +    }
 +  }
 +  
 +  public static class TCVSTranslator extends Translator<TConstraintViolationSummary,org.apache.accumulo.core.data.ConstraintViolationSummary> {
 +    @Override
 +    public ConstraintViolationSummary translate(TConstraintViolationSummary input) {
 +      return new ConstraintViolationSummary(input);
 +    }
 +  }
 +  
 +  public static class CVSTranslator extends Translator<org.apache.accumulo.core.data.ConstraintViolationSummary,TConstraintViolationSummary> {
 +    @Override
 +    public TConstraintViolationSummary translate(ConstraintViolationSummary input) {
 +      return input.toThrift();
 +    }
 +  }
 +  
 +  public static class ColumnTranslator extends Translator<Column,TColumn> {
 +    @Override
 +    public TColumn translate(Column input) {
 +      return input.toThrift();
 +    }
 +  }
 +  
 +  public static class TRangeTranslator extends Translator<TRange,Range> {
 +    
 +    @Override
 +    public Range translate(TRange input) {
 +      return new Range(input);
 +    }
 +    
 +  }
 +  
 +  public static class RangeTranslator extends Translator<Range,TRange> {
 +    @Override
 +    public TRange translate(Range input) {
 +      return input.toThrift();
 +    }
 +  }
 +  
 +  public static class ListTranslator<IT,OT> extends Translator<List<IT>,List<OT>> {
 +    
 +    private Translator<IT,OT> translator;
 +    
 +    public ListTranslator(Translator<IT,OT> translator) {
 +      this.translator = translator;
 +    }
 +    
 +    @Override
 +    public List<OT> translate(List<IT> input) {
 +      return translate(input, this.translator);
 +    }
 +    
 +  }
 +  
-   public static final TKeyExtentTranslator TKET = new TKeyExtentTranslator();
-   public static final TCVSTranslator TCVST = new TCVSTranslator();
-   public static final TRangeTranslator TRT = new TRangeTranslator();
-   
-   public static final KeyExtentTranslator KET = new KeyExtentTranslator();
-   public static final ColumnTranslator CT = new ColumnTranslator();
-   public static final Translator<Range,TRange> RT = new RangeTranslator();
-   public static final CVSTranslator CVST = new CVSTranslator();
-   
 +  public static <IKT,OKT,T> Map<OKT,T> translate(Map<IKT,T> input, Translator<IKT,OKT> keyTranslator) {
 +    HashMap<OKT,T> output = new HashMap<OKT,T>();
 +    
 +    for (Entry<IKT,T> entry : input.entrySet())
 +      output.put(keyTranslator.translate(entry.getKey()), entry.getValue());
 +    
 +    return output;
 +  }
 +  
 +  public static <IKT,OKT,IVT,OVT> Map<OKT,OVT> translate(Map<IKT,IVT> input, Translator<IKT,OKT> keyTranslator, Translator<IVT,OVT> valueTranslator) {
 +    HashMap<OKT,OVT> output = new HashMap<OKT,OVT>();
 +    
 +    for (Entry<IKT,IVT> entry : input.entrySet())
 +      output.put(keyTranslator.translate(entry.getKey()), valueTranslator.translate(entry.getValue()));
 +    
 +    return output;
 +  }
 +  
 +  public static <IT,OT> List<OT> translate(Collection<IT> input, Translator<IT,OT> translator) {
 +    ArrayList<OT> output = new ArrayList<OT>(input.size());
 +    
 +    for (IT in : input)
 +      output.add(translator.translate(in));
 +    
 +    return output;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0bd62e39/core/src/main/java/org/apache/accumulo/core/client/impl/Translators.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/Translators.java
index 0000000,0000000..c433ae7
new file mode 100644
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/Translators.java
@@@ -1,0 -1,0 +1,37 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements.  See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.accumulo.core.client.impl;
++
++import org.apache.accumulo.core.client.impl.Translator.CVSTranslator;
++import org.apache.accumulo.core.client.impl.Translator.ColumnTranslator;
++import org.apache.accumulo.core.client.impl.Translator.KeyExtentTranslator;
++import org.apache.accumulo.core.client.impl.Translator.RangeTranslator;
++import org.apache.accumulo.core.client.impl.Translator.TCVSTranslator;
++import org.apache.accumulo.core.client.impl.Translator.TKeyExtentTranslator;
++import org.apache.accumulo.core.client.impl.Translator.TRangeTranslator;
++import org.apache.accumulo.core.data.Range;
++import org.apache.accumulo.core.data.thrift.TRange;
++
++public class Translators {
++  public static final TKeyExtentTranslator TKET = new TKeyExtentTranslator();
++  public static final TCVSTranslator TCVST = new TCVSTranslator();
++  public static final TRangeTranslator TRT = new TRangeTranslator();
++  public static final KeyExtentTranslator KET = new KeyExtentTranslator();
++  public static final ColumnTranslator CT = new ColumnTranslator();
++  public static final Translator<Range,TRange> RT = new RangeTranslator();
++  public static final CVSTranslator CVST = new CVSTranslator();
++}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0bd62e39/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
index f6846ac,0000000..5423302
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
+++ b/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
@@@ -1,768 -1,0 +1,769 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.client;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +import java.util.TreeMap;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.impl.ServerClient;
 +import org.apache.accumulo.core.client.impl.TabletLocator;
 +import org.apache.accumulo.core.client.impl.TabletLocator.TabletLocation;
 +import org.apache.accumulo.core.client.impl.Translator;
++import org.apache.accumulo.core.client.impl.Translators;
 +import org.apache.accumulo.core.client.impl.thrift.ClientService;
 +import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 +import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.ByteSequence;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.thrift.TKeyExtent;
 +import org.apache.accumulo.core.file.FileOperations;
 +import org.apache.accumulo.core.file.FileSKVIterator;
 +import org.apache.accumulo.core.file.FileUtil;
 +import org.apache.accumulo.core.security.thrift.TCredentials;
 +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 +import org.apache.accumulo.core.util.CachedConfiguration;
 +import org.apache.accumulo.core.util.LoggingRunnable;
 +import org.apache.accumulo.core.util.NamingThreadFactory;
 +import org.apache.accumulo.core.util.StopWatch;
 +import org.apache.accumulo.core.util.ThriftUtil;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.trace.instrument.TraceRunnable;
 +import org.apache.accumulo.trace.instrument.Tracer;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.Text;
 +import org.apache.log4j.Logger;
 +import org.apache.thrift.TServiceClient;
 +
 +
 +public class BulkImporter {
 +  
 +  private static final Logger log = Logger.getLogger(BulkImporter.class);
 +  
 +  public static List<String> bulkLoad(AccumuloConfiguration conf, Instance instance, TCredentials creds, long tid, String tableId, List<String> files,
 +      String errorDir, boolean setTime) throws IOException, AccumuloException, AccumuloSecurityException, ThriftTableOperationException {
 +    AssignmentStats stats = new BulkImporter(conf, instance, creds, tid, tableId, setTime).importFiles(files, new Path(errorDir));
 +    List<String> result = new ArrayList<String>();
 +    for (Path p : stats.completeFailures.keySet()) {
 +      result.add(p.toString());
 +    }
 +    return result;
 +  }
 +  
 +  private StopWatch<Timers> timer;
 +  
 +  private static enum Timers {
 +    EXAMINE_MAP_FILES, QUERY_METADATA, IMPORT_MAP_FILES, SLEEP, TOTAL
 +  }
 +  
 +  private Instance instance;
 +  private TCredentials credentials;
 +  private String tableId;
 +  private long tid;
 +  private AccumuloConfiguration acuConf;
 +  private boolean setTime;
 +  
 +  public BulkImporter(AccumuloConfiguration conf, Instance instance, TCredentials credentials, long tid, String tableId, boolean setTime) {
 +    this.instance = instance;
 +    this.credentials = credentials;
 +    this.tid = tid;
 +    this.tableId = tableId;
 +    this.acuConf = conf;
 +    this.setTime = setTime;
 +  }
 +  
 +  public AssignmentStats importFiles(List<String> files, Path failureDir) throws IOException, AccumuloException, AccumuloSecurityException,
 +      ThriftTableOperationException {
 +    
 +    int numThreads = acuConf.getCount(Property.TSERV_BULK_PROCESS_THREADS);
 +    int numAssignThreads = acuConf.getCount(Property.TSERV_BULK_ASSIGNMENT_THREADS);
 +    
 +    timer = new StopWatch<Timers>(Timers.class);
 +    timer.start(Timers.TOTAL);
 +    
 +    Configuration conf = CachedConfiguration.getInstance();
 +    final FileSystem fs = FileSystem.get(conf);
 +    
 +    Set<Path> paths = new HashSet<Path>();
 +    for (String file : files) {
 +      paths.add(new Path(file));
 +    }
 +    AssignmentStats assignmentStats = new AssignmentStats(paths.size());
 +    
 +    final Map<Path,List<KeyExtent>> completeFailures = Collections.synchronizedSortedMap(new TreeMap<Path,List<KeyExtent>>());
 +    
 +    if (!fs.exists(failureDir)) {
 +      log.error(failureDir + " does not exist");
 +      throw new RuntimeException("Directory does not exist " + failureDir);
 +    }
 +    
 +    ClientService.Client client = null;
 +    final TabletLocator locator = TabletLocator.getInstance(instance, new Text(tableId));
 +    
 +    try {
 +      final Map<Path,List<TabletLocation>> assignments = Collections.synchronizedSortedMap(new TreeMap<Path,List<TabletLocation>>());
 +      
 +      timer.start(Timers.EXAMINE_MAP_FILES);
 +      ExecutorService threadPool = Executors.newFixedThreadPool(numThreads, new NamingThreadFactory("findOverlapping"));
 +      
 +      for (Path path : paths) {
 +        final Path mapFile = path;
 +        Runnable getAssignments = new Runnable() {
 +          public void run() {
 +            List<TabletLocation> tabletsToAssignMapFileTo = Collections.emptyList();
 +            try {
 +              tabletsToAssignMapFileTo = findOverlappingTablets(instance.getConfiguration(), fs, locator, mapFile, credentials);
 +            } catch (Exception ex) {
 +              log.warn("Unable to find tablets that overlap file " + mapFile.toString());
 +            }
 +            log.debug("Map file " + mapFile + " found to overlap " + tabletsToAssignMapFileTo.size() + " tablets");
 +            if (tabletsToAssignMapFileTo.size() == 0) {
 +              List<KeyExtent> empty = Collections.emptyList();
 +              completeFailures.put(mapFile, empty);
 +            } else
 +              assignments.put(mapFile, tabletsToAssignMapFileTo);
 +          }
 +        };
 +        threadPool.submit(new TraceRunnable(new LoggingRunnable(log, getAssignments)));
 +      }
 +      threadPool.shutdown();
 +      while (!threadPool.isTerminated()) {
 +        try {
 +          threadPool.awaitTermination(60, TimeUnit.SECONDS);
 +        } catch (InterruptedException e) {
 +          throw new RuntimeException(e);
 +        }
 +      }
 +      timer.stop(Timers.EXAMINE_MAP_FILES);
 +      
 +      assignmentStats.attemptingAssignments(assignments);
 +      Map<Path,List<KeyExtent>> assignmentFailures = assignMapFiles(acuConf, instance, conf, credentials, fs, tableId, assignments, paths, numAssignThreads,
 +          numThreads);
 +      assignmentStats.assignmentsFailed(assignmentFailures);
 +      
 +      Map<Path,Integer> failureCount = new TreeMap<Path,Integer>();
 +      
 +      for (Entry<Path,List<KeyExtent>> entry : assignmentFailures.entrySet())
 +        failureCount.put(entry.getKey(), 1);
 +      
 +      long sleepTime = 2*1000;
 +      while (assignmentFailures.size() > 0) {
 +        sleepTime = Math.min(sleepTime*2, 60*1000);
 +        locator.invalidateCache();
 +        // assumption about assignment failures is that it caused by a split
 +        // happening or a missing location
 +        //
 +        // for splits we need to find children key extents that cover the
 +        // same key range and are contiguous (no holes, no overlap)
 +        
 +        timer.start(Timers.SLEEP);
 +        UtilWaitThread.sleep(sleepTime);
 +        timer.stop(Timers.SLEEP);
 +        
 +        log.debug("Trying to assign " + assignmentFailures.size() + " map files that previously failed on some key extents");
 +        assignments.clear();
 +        
 +        // for failed key extents, try to find children key extents to
 +        // assign to
 +        for (Entry<Path,List<KeyExtent>> entry : assignmentFailures.entrySet()) {
 +          Iterator<KeyExtent> keListIter = entry.getValue().iterator();
 +          
 +          List<TabletLocation> tabletsToAssignMapFileTo = new ArrayList<TabletLocation>();
 +          
 +          while (keListIter.hasNext()) {
 +            KeyExtent ke = keListIter.next();
 +            
 +            try {
 +              timer.start(Timers.QUERY_METADATA);
 +              tabletsToAssignMapFileTo.addAll(findOverlappingTablets(instance.getConfiguration(), fs, locator, entry.getKey(), ke, credentials));
 +              timer.stop(Timers.QUERY_METADATA);
 +              keListIter.remove();
 +            } catch (Exception ex) {
 +              log.warn("Exception finding overlapping tablets, will retry tablet " + ke);
 +            }
 +          }
 +          
 +          if (tabletsToAssignMapFileTo.size() > 0)
 +            assignments.put(entry.getKey(), tabletsToAssignMapFileTo);
 +        }
 +        
 +        assignmentStats.attemptingAssignments(assignments);
 +        Map<Path,List<KeyExtent>> assignmentFailures2 = assignMapFiles(acuConf, instance, conf, credentials, fs, tableId, assignments, paths, numAssignThreads,
 +            numThreads);
 +        assignmentStats.assignmentsFailed(assignmentFailures2);
 +        
 +        // merge assignmentFailures2 into assignmentFailures
 +        for (Entry<Path,List<KeyExtent>> entry : assignmentFailures2.entrySet()) {
 +          assignmentFailures.get(entry.getKey()).addAll(entry.getValue());
 +          
 +          Integer fc = failureCount.get(entry.getKey());
 +          if (fc == null)
 +            fc = 0;
 +          
 +          failureCount.put(entry.getKey(), fc + 1);
 +        }
 +        
 +        // remove map files that have no more key extents to assign
 +        Iterator<Entry<Path,List<KeyExtent>>> afIter = assignmentFailures.entrySet().iterator();
 +        while (afIter.hasNext()) {
 +          Entry<Path,List<KeyExtent>> entry = afIter.next();
 +          if (entry.getValue().size() == 0)
 +            afIter.remove();
 +        }
 +        
 +        Set<Entry<Path,Integer>> failureIter = failureCount.entrySet();
 +        for (Entry<Path,Integer> entry : failureIter) {
 +          int retries = acuConf.getCount(Property.TSERV_BULK_RETRY);
 +          if (entry.getValue() > retries && assignmentFailures.get(entry.getKey()) != null) {
 +            log.error("Map file " + entry.getKey() + " failed more than " + retries + " times, giving up.");
 +            completeFailures.put(entry.getKey(), assignmentFailures.get(entry.getKey()));
 +            assignmentFailures.remove(entry.getKey());
 +          }
 +        }
 +      }
 +      assignmentStats.assignmentsAbandoned(completeFailures);
 +      Set<Path> failedFailures = processFailures(conf, fs, failureDir, completeFailures);
 +      assignmentStats.unrecoveredMapFiles(failedFailures);
 +      
 +      timer.stop(Timers.TOTAL);
 +      printReport();
 +      return assignmentStats;
 +    } finally {
 +      if (client != null)
 +        ServerClient.close(client);
 +      locator.invalidateCache();
 +    }
 +  }
 +  
 +  private void printReport() {
 +    long totalTime = 0;
 +    for (Timers t : Timers.values()) {
 +      if (t == Timers.TOTAL)
 +        continue;
 +      
 +      totalTime += timer.get(t);
 +    }
 +    
 +    log.debug("BULK IMPORT TIMING STATISTICS");
 +    log.debug(String.format("Examine map files    : %,10.2f secs %6.2f%s", timer.getSecs(Timers.EXAMINE_MAP_FILES), 100.0 * timer.get(Timers.EXAMINE_MAP_FILES)
 +        / timer.get(Timers.TOTAL), "%"));
 +    log.debug(String.format("Query %-14s : %,10.2f secs %6.2f%s", Constants.METADATA_TABLE_NAME, timer.getSecs(Timers.QUERY_METADATA),
 +        100.0 * timer.get(Timers.QUERY_METADATA) / timer.get(Timers.TOTAL), "%"));
 +    log.debug(String.format("Import Map Files     : %,10.2f secs %6.2f%s", timer.getSecs(Timers.IMPORT_MAP_FILES), 100.0 * timer.get(Timers.IMPORT_MAP_FILES)
 +        / timer.get(Timers.TOTAL), "%"));
 +    log.debug(String.format("Sleep                : %,10.2f secs %6.2f%s", timer.getSecs(Timers.SLEEP),
 +        100.0 * timer.get(Timers.SLEEP) / timer.get(Timers.TOTAL), "%"));
 +    log.debug(String.format("Misc                 : %,10.2f secs %6.2f%s", (timer.get(Timers.TOTAL) - totalTime) / 1000.0, 100.0
 +        * (timer.get(Timers.TOTAL) - totalTime) / timer.get(Timers.TOTAL), "%"));
 +    log.debug(String.format("Total                : %,10.2f secs", timer.getSecs(Timers.TOTAL)));
 +  }
 +  
 +  private Set<Path> processFailures(Configuration conf, FileSystem fs, Path failureDir, Map<Path,List<KeyExtent>> completeFailures) {
 +    // we should check if map file was not assigned to any tablets, then we
 +    // should just move it; not currently being done?
 +    
 +    Set<Entry<Path,List<KeyExtent>>> es = completeFailures.entrySet();
 +    
 +    if (completeFailures.size() == 0)
 +      return Collections.emptySet();
 +    
 +    log.debug("The following map files failed ");
 +    
 +    for (Entry<Path,List<KeyExtent>> entry : es) {
 +      List<KeyExtent> extents = entry.getValue();
 +      
 +      for (KeyExtent keyExtent : extents)
 +        log.debug("\t" + entry.getKey() + " -> " + keyExtent);
 +    }
 +
 +    return Collections.emptySet();
 +  }
 +  
 +  private class AssignmentInfo {
 +    public AssignmentInfo(KeyExtent keyExtent, Long estSize) {
 +      this.ke = keyExtent;
 +      this.estSize = estSize;
 +    }
 +    
 +    KeyExtent ke;
 +    long estSize;
 +  }
 +  
 +  private static List<KeyExtent> extentsOf(List<TabletLocation> locations) {
 +    List<KeyExtent> result = new ArrayList<KeyExtent>(locations.size());
 +    for (TabletLocation tl : locations)
 +      result.add(tl.tablet_extent);
 +    return result;
 +  }
 +  
 +  private Map<Path,List<AssignmentInfo>> estimateSizes(final AccumuloConfiguration acuConf, final Configuration conf, final FileSystem fs,
 +      Map<Path,List<TabletLocation>> assignments, Collection<Path> paths, int numThreads) {
 +    
 +    long t1 = System.currentTimeMillis();
 +    final Map<Path,Long> mapFileSizes = new TreeMap<Path,Long>();
 +    
 +    try {
 +      for (Path path : paths) {
 +        mapFileSizes.put(path, fs.getContentSummary(path).getLength());
 +      }
 +    } catch (IOException e) {
 +      log.error("Failed to get map files in for " + paths + ": " + e.getMessage(), e);
 +      throw new RuntimeException(e);
 +    }
 +    
 +    final Map<Path,List<AssignmentInfo>> ais = Collections.synchronizedMap(new TreeMap<Path,List<AssignmentInfo>>());
 +    
 +    ExecutorService threadPool = Executors.newFixedThreadPool(numThreads, new NamingThreadFactory("estimateSizes"));
 +    
 +    for (final Entry<Path,List<TabletLocation>> entry : assignments.entrySet()) {
 +      if (entry.getValue().size() == 1) {
 +        TabletLocation tabletLocation = entry.getValue().get(0);
 +        
 +        // if the tablet completely contains the map file, there is no
 +        // need to estimate its
 +        // size
 +        ais.put(entry.getKey(), Collections.singletonList(new AssignmentInfo(tabletLocation.tablet_extent, mapFileSizes.get(entry.getKey()))));
 +        continue;
 +      }
 +      
 +      Runnable estimationTask = new Runnable() {
 +        public void run() {
 +          Map<KeyExtent,Long> estimatedSizes = null;
 +          
 +          try {
 +            estimatedSizes = FileUtil.estimateSizes(acuConf, entry.getKey(), mapFileSizes.get(entry.getKey()), extentsOf(entry.getValue()), conf, fs);
 +          } catch (IOException e) {
 +            log.warn("Failed to estimate map file sizes " + e.getMessage());
 +          }
 +          
 +          if (estimatedSizes == null) {
 +            // estimation failed, do a simple estimation
 +            estimatedSizes = new TreeMap<KeyExtent,Long>();
 +            long estSize = (long) (mapFileSizes.get(entry.getKey()) / (double) entry.getValue().size());
 +            for (TabletLocation tl : entry.getValue())
 +              estimatedSizes.put(tl.tablet_extent, estSize);
 +          }
 +          
 +          List<AssignmentInfo> assignmentInfoList = new ArrayList<AssignmentInfo>(estimatedSizes.size());
 +          
 +          for (Entry<KeyExtent,Long> entry2 : estimatedSizes.entrySet())
 +            assignmentInfoList.add(new AssignmentInfo(entry2.getKey(), entry2.getValue()));
 +          
 +          ais.put(entry.getKey(), assignmentInfoList);
 +        }
 +      };
 +      
 +      threadPool.submit(new TraceRunnable(new LoggingRunnable(log, estimationTask)));
 +    }
 +    
 +    threadPool.shutdown();
 +    
 +    while (!threadPool.isTerminated()) {
 +      try {
 +        threadPool.awaitTermination(60, TimeUnit.SECONDS);
 +      } catch (InterruptedException e) {
 +        e.printStackTrace();
 +        throw new RuntimeException(e);
 +      }
 +    }
 +    
 +    long t2 = System.currentTimeMillis();
 +    
 +    log.debug(String.format("Estimated map files sizes in %6.2f secs", (t2 - t1) / 1000.0));
 +    
 +    return ais;
 +  }
 +  
 +  private static Map<KeyExtent,String> locationsOf(Map<Path,List<TabletLocation>> assignments) {
 +    Map<KeyExtent,String> result = new HashMap<KeyExtent,String>();
 +    for (List<TabletLocation> entry : assignments.values()) {
 +      for (TabletLocation tl : entry) {
 +        result.put(tl.tablet_extent, tl.tablet_location);
 +      }
 +    }
 +    return result;
 +  }
 +  
 +  private Map<Path,List<KeyExtent>> assignMapFiles(AccumuloConfiguration acuConf, Instance instance, Configuration conf, TCredentials credentials, FileSystem fs,
 +      String tableId, Map<Path,List<TabletLocation>> assignments, Collection<Path> paths, int numThreads, int numMapThreads) {
 +    timer.start(Timers.EXAMINE_MAP_FILES);
 +    Map<Path,List<AssignmentInfo>> assignInfo = estimateSizes(acuConf, conf, fs, assignments, paths, numMapThreads);
 +    timer.stop(Timers.EXAMINE_MAP_FILES);
 +    
 +    Map<Path,List<KeyExtent>> ret;
 +    
 +    timer.start(Timers.IMPORT_MAP_FILES);
 +    ret = assignMapFiles(credentials, tableId, assignInfo, locationsOf(assignments), numThreads);
 +    timer.stop(Timers.IMPORT_MAP_FILES);
 +    
 +    return ret;
 +  }
 +  
 +  private class AssignmentTask implements Runnable {
 +    final Map<Path,List<KeyExtent>> assignmentFailures;
 +    String location;
 +    TCredentials credentials;
 +    private Map<KeyExtent,List<PathSize>> assignmentsPerTablet;
 +    
 +    public AssignmentTask(TCredentials credentials, Map<Path,List<KeyExtent>> assignmentFailures, String tableName, String location,
 +        Map<KeyExtent,List<PathSize>> assignmentsPerTablet) {
 +      this.assignmentFailures = assignmentFailures;
 +      this.location = location;
 +      this.assignmentsPerTablet = assignmentsPerTablet;
 +      this.credentials = credentials;
 +    }
 +    
 +    private void handleFailures(Collection<KeyExtent> failures, String message) {
 +      for (KeyExtent ke : failures) {
 +        List<PathSize> mapFiles = assignmentsPerTablet.get(ke);
 +        synchronized (assignmentFailures) {
 +          for (PathSize pathSize : mapFiles) {
 +            List<KeyExtent> existingFailures = assignmentFailures.get(pathSize.path);
 +            if (existingFailures == null) {
 +              existingFailures = new ArrayList<KeyExtent>();
 +              assignmentFailures.put(pathSize.path, existingFailures);
 +            }
 +            
 +            existingFailures.add(ke);
 +          }
 +        }
 +        
 +        log.info("Could not assign " + mapFiles.size() + " map files to tablet " + ke + " because : " + message + ".  Will retry ...");
 +      }
 +    }
 +    
 +    public void run() {
 +      HashSet<Path> uniqMapFiles = new HashSet<Path>();
 +      for (List<PathSize> mapFiles : assignmentsPerTablet.values())
 +        for (PathSize ps : mapFiles)
 +          uniqMapFiles.add(ps.path);
 +      
 +      log.debug("Assigning " + uniqMapFiles.size() + " map files to " + assignmentsPerTablet.size() + " tablets at " + location);
 +      
 +      try {
 +        List<KeyExtent> failures = assignMapFiles(credentials, location, assignmentsPerTablet);
 +        handleFailures(failures, "Not Serving Tablet");
 +      } catch (AccumuloException e) {
 +        handleFailures(assignmentsPerTablet.keySet(), e.getMessage());
 +      } catch (AccumuloSecurityException e) {
 +        handleFailures(assignmentsPerTablet.keySet(), e.getMessage());
 +      }
 +    }
 +    
 +  }
 +  
 +  private class PathSize {
 +    public PathSize(Path mapFile, long estSize) {
 +      this.path = mapFile;
 +      this.estSize = estSize;
 +    }
 +    
 +    Path path;
 +    long estSize;
 +    
 +    public String toString() {
 +      return path + " " + estSize;
 +    }
 +  }
 +  
 +  private Map<Path,List<KeyExtent>> assignMapFiles(TCredentials credentials, String tableName, Map<Path,List<AssignmentInfo>> assignments,
 +      Map<KeyExtent,String> locations, int numThreads) {
 +    
 +    // group assignments by tablet
 +    Map<KeyExtent,List<PathSize>> assignmentsPerTablet = new TreeMap<KeyExtent,List<PathSize>>();
 +    for (Entry<Path,List<AssignmentInfo>> entry : assignments.entrySet()) {
 +      Path mapFile = entry.getKey();
 +      List<AssignmentInfo> tabletsToAssignMapFileTo = entry.getValue();
 +      
 +      for (AssignmentInfo ai : tabletsToAssignMapFileTo) {
 +        List<PathSize> mapFiles = assignmentsPerTablet.get(ai.ke);
 +        if (mapFiles == null) {
 +          mapFiles = new ArrayList<PathSize>();
 +          assignmentsPerTablet.put(ai.ke, mapFiles);
 +        }
 +        
 +        mapFiles.add(new PathSize(mapFile, ai.estSize));
 +      }
 +    }
 +    
 +    // group assignments by tabletserver
 +    
 +    Map<Path,List<KeyExtent>> assignmentFailures = Collections.synchronizedMap(new TreeMap<Path,List<KeyExtent>>());
 +    
 +    TreeMap<String,Map<KeyExtent,List<PathSize>>> assignmentsPerTabletServer = new TreeMap<String,Map<KeyExtent,List<PathSize>>>();
 +    
 +    for (Entry<KeyExtent,List<PathSize>> entry : assignmentsPerTablet.entrySet()) {
 +      KeyExtent ke = entry.getKey();
 +      String location = locations.get(ke);
 +      
 +      if (location == null) {
 +        for (PathSize pathSize : entry.getValue()) {
 +          synchronized (assignmentFailures) {
 +            List<KeyExtent> failures = assignmentFailures.get(pathSize.path);
 +            if (failures == null) {
 +              failures = new ArrayList<KeyExtent>();
 +              assignmentFailures.put(pathSize.path, failures);
 +            }
 +            
 +            failures.add(ke);
 +          }
 +        }
 +        
 +        log.warn("Could not assign " + entry.getValue().size() + " map files to tablet " + ke + " because it had no location, will retry ...");
 +        
 +        continue;
 +      }
 +      
 +      Map<KeyExtent,List<PathSize>> apt = assignmentsPerTabletServer.get(location);
 +      if (apt == null) {
 +        apt = new TreeMap<KeyExtent,List<PathSize>>();
 +        assignmentsPerTabletServer.put(location, apt);
 +      }
 +      
 +      apt.put(entry.getKey(), entry.getValue());
 +    }
 +    
 +    ExecutorService threadPool = Executors.newFixedThreadPool(numThreads, new NamingThreadFactory("submit"));
 +    
 +    for (Entry<String,Map<KeyExtent,List<PathSize>>> entry : assignmentsPerTabletServer.entrySet()) {
 +      String location = entry.getKey();
 +      threadPool.submit(new AssignmentTask(credentials, assignmentFailures, tableName, location, entry.getValue()));
 +    }
 +    
 +    threadPool.shutdown();
 +    
 +    while (!threadPool.isTerminated()) {
 +      try {
 +        threadPool.awaitTermination(60, TimeUnit.SECONDS);
 +      } catch (InterruptedException e) {
 +        e.printStackTrace();
 +        throw new RuntimeException(e);
 +      }
 +    }
 +    
 +    return assignmentFailures;
 +  }
 +  
 +  private List<KeyExtent> assignMapFiles(TCredentials credentials, String location, Map<KeyExtent,List<PathSize>> assignmentsPerTablet) throws AccumuloException,
 +      AccumuloSecurityException {
 +    try {
 +      long timeInMillis = instance.getConfiguration().getTimeInMillis(Property.TSERV_BULK_TIMEOUT);
 +      TabletClientService.Iface client = ThriftUtil.getTServerClient(location, instance.getConfiguration(), timeInMillis);
 +      try {
 +        HashMap<KeyExtent,Map<String,org.apache.accumulo.core.data.thrift.MapFileInfo>> files = new HashMap<KeyExtent,Map<String,org.apache.accumulo.core.data.thrift.MapFileInfo>>();
 +        for (Entry<KeyExtent,List<PathSize>> entry : assignmentsPerTablet.entrySet()) {
 +          HashMap<String,org.apache.accumulo.core.data.thrift.MapFileInfo> tabletFiles = new HashMap<String,org.apache.accumulo.core.data.thrift.MapFileInfo>();
 +          files.put(entry.getKey(), tabletFiles);
 +          
 +          for (PathSize pathSize : entry.getValue()) {
 +            org.apache.accumulo.core.data.thrift.MapFileInfo mfi = new org.apache.accumulo.core.data.thrift.MapFileInfo(pathSize.estSize);
 +            tabletFiles.put(pathSize.path.toUri().getPath().toString(), mfi);
 +          }
 +        }
 +        
 +        log.debug("Asking " + location + " to bulk load " + files);
-         List<TKeyExtent> failures = client.bulkImport(Tracer.traceInfo(), credentials, tid, Translator.translate(files, Translator.KET), setTime);
++        List<TKeyExtent> failures = client.bulkImport(Tracer.traceInfo(), credentials, tid, Translator.translate(files, Translators.KET), setTime);
 +        
-         return Translator.translate(failures, Translator.TKET);
++        return Translator.translate(failures, Translators.TKET);
 +      } finally {
 +        ThriftUtil.returnClient((TServiceClient) client);
 +      }
 +    } catch (ThriftSecurityException e) {
 +      throw new AccumuloSecurityException(e.user, e.code, e);
 +    } catch (Throwable t) {
 +      t.printStackTrace();
 +      throw new AccumuloException(t);
 +    }
 +  }
 +  
 +  public static List<TabletLocation> findOverlappingTablets(AccumuloConfiguration acuConf, FileSystem fs, TabletLocator locator, Path file, TCredentials credentials) throws Exception {
 +    return findOverlappingTablets(acuConf, fs, locator, file, null, null, credentials);
 +  }
 +  
 +  public static List<TabletLocation> findOverlappingTablets(AccumuloConfiguration acuConf, FileSystem fs, TabletLocator locator, Path file, KeyExtent failed, TCredentials credentials)
 +      throws Exception {
 +    locator.invalidateCache(failed);
 +    Text start = failed.getPrevEndRow();
 +    if (start != null)
 +      start = Range.followingPrefix(start);
 +    return findOverlappingTablets(acuConf, fs, locator, file, start, failed.getEndRow(), credentials);
 +  }
 +  
 +  final static byte[] byte0 = {0};
 +
 +  public static List<TabletLocation> findOverlappingTablets(AccumuloConfiguration acuConf, FileSystem fs, TabletLocator locator, Path file, Text startRow,
 +      Text endRow, TCredentials credentials) throws Exception {
 +    List<TabletLocation> result = new ArrayList<TabletLocation>();
 +    Collection<ByteSequence> columnFamilies = Collections.emptyList();
 +    String filename = file.toString();
 +    // log.debug(filename + " finding overlapping tablets " + startRow + " -> " + endRow);
 +    FileSKVIterator reader = FileOperations.getInstance().openReader(filename, true, fs, fs.getConf(), acuConf);
 +    try {
 +      Text row = startRow;
 +      if (row == null)
 +        row = new Text();
 +      while (true) {
 +        // log.debug(filename + " Seeking to row " + row);
 +        reader.seek(new Range(row, null), columnFamilies, false);
 +        if (!reader.hasTop()) {
 +          // log.debug(filename + " not found");
 +          break;
 +        }
 +        row = reader.getTopKey().getRow();
 +        TabletLocation tabletLocation = locator.locateTablet(row, false, true, credentials);
 +        // log.debug(filename + " found row " + row + " at location " + tabletLocation);
 +        result.add(tabletLocation);
 +        row = tabletLocation.tablet_extent.getEndRow();
 +        if (row != null && (endRow == null || row.compareTo(endRow) < 0)) {
 +          row = new Text(row);
 +          row.append(byte0, 0, byte0.length);
 +        } else
 +          break;
 +      }
 +    } finally {
 +      reader.close();
 +    }
 +    // log.debug(filename + " to be sent to " + result);
 +    return result;
 +  }
 +  
 +  public static class AssignmentStats {
 +    private Map<KeyExtent,Integer> counts;
 +    private int numUniqueMapFiles;
 +    private Map<Path,List<KeyExtent>> completeFailures = null;
 +    private Set<Path> failedFailures = null;
 +    
 +    AssignmentStats(int fileCount) {
 +      counts = new HashMap<KeyExtent,Integer>();
 +      numUniqueMapFiles = fileCount;
 +    }
 +    
 +    void attemptingAssignments(Map<Path,List<TabletLocation>> assignments) {
 +      for (Entry<Path,List<TabletLocation>> entry : assignments.entrySet()) {
 +        for (TabletLocation tl : entry.getValue()) {
 +          
 +          Integer count = getCount(tl.tablet_extent);
 +          
 +          counts.put(tl.tablet_extent, count + 1);
 +        }
 +      }
 +    }
 +    
 +    void assignmentsFailed(Map<Path,List<KeyExtent>> assignmentFailures) {
 +      for (Entry<Path,List<KeyExtent>> entry : assignmentFailures.entrySet()) {
 +        for (KeyExtent ke : entry.getValue()) {
 +          
 +          Integer count = getCount(ke);
 +          
 +          counts.put(ke, count - 1);
 +        }
 +      }
 +    }
 +    
 +    void assignmentsAbandoned(Map<Path,List<KeyExtent>> completeFailures) {
 +      this.completeFailures = completeFailures;
 +    }
 +    
 +    void tabletSplit(KeyExtent parent, Collection<KeyExtent> children) {
 +      Integer count = getCount(parent);
 +      
 +      counts.remove(parent);
 +      
 +      for (KeyExtent keyExtent : children)
 +        counts.put(keyExtent, count);
 +    }
 +    
 +    private Integer getCount(KeyExtent parent) {
 +      Integer count = counts.get(parent);
 +      
 +      if (count == null) {
 +        count = 0;
 +      }
 +      return count;
 +    }
 +    
 +    void unrecoveredMapFiles(Set<Path> failedFailures) {
 +      this.failedFailures = failedFailures;
 +    }
 +    
 +    public String toString() {
 +      StringBuilder sb = new StringBuilder();
 +      int totalAssignments = 0;
 +      int tabletsImportedTo = 0;
 +      
 +      int min = Integer.MAX_VALUE, max = Integer.MIN_VALUE;
 +      
 +      for (Entry<KeyExtent,Integer> entry : counts.entrySet()) {
 +        totalAssignments += entry.getValue();
 +        if (entry.getValue() > 0)
 +          tabletsImportedTo++;
 +        
 +        if (entry.getValue() < min)
 +          min = entry.getValue();
 +        
 +        if (entry.getValue() > max)
 +          max = entry.getValue();
 +      }
 +      
 +      double stddev = 0;
 +      
 +      for (Entry<KeyExtent,Integer> entry : counts.entrySet())
 +        stddev += Math.pow(entry.getValue() - totalAssignments / (double) counts.size(), 2);
 +      
 +      stddev = stddev / counts.size();
 +      stddev = Math.sqrt(stddev);
 +      
 +      Set<KeyExtent> failedTablets = new HashSet<KeyExtent>();
 +      for (List<KeyExtent> ft : completeFailures.values())
 +        failedTablets.addAll(ft);
 +      
 +      sb.append("BULK IMPORT ASSIGNMENT STATISTICS\n");
 +      sb.append(String.format("# of map files            : %,10d%n", numUniqueMapFiles));
 +      sb.append(String.format("# map files with failures : %,10d %6.2f%s%n", completeFailures.size(), completeFailures.size() * 100.0 / numUniqueMapFiles, "%"));
 +      sb.append(String.format("# failed failed map files : %,10d %s%n", failedFailures.size(), failedFailures.size() > 0 ? " <-- THIS IS BAD" : ""));
 +      sb.append(String.format("# of tablets              : %,10d%n", counts.size()));
 +      sb.append(String.format("# tablets imported to     : %,10d %6.2f%s%n", tabletsImportedTo, tabletsImportedTo * 100.0 / counts.size(), "%"));
 +      sb.append(String.format("# tablets with failures   : %,10d %6.2f%s%n", failedTablets.size(), failedTablets.size() * 100.0 / counts.size(), "%"));
 +      sb.append(String.format("min map files per tablet  : %,10d%n", min));
 +      sb.append(String.format("max map files per tablet  : %,10d%n", max));
 +      sb.append(String.format("avg map files per tablet  : %,10.2f (std dev = %.2f)%n", totalAssignments / (double) counts.size(), stddev));
 +      return sb.toString();
 +    }
 +  }
 +  
 +}


Mime
View raw message