accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [09/50] [abbrv] Merge branch '1.5' into 1.6
Date Sat, 01 Nov 2014 04:57:03 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
----------------------------------------------------------------------
diff --cc server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
index 1226806,0000000..10cf224
mode 100644,000000..100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
@@@ -1,617 -1,0 +1,619 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.master.tableOps;
 +
++import static com.google.common.base.Charsets.UTF_8;
++
 +import java.io.BufferedReader;
 +import java.io.BufferedWriter;
 +import java.io.FileNotFoundException;
 +import java.io.IOException;
 +import java.io.InputStreamReader;
 +import java.io.OutputStreamWriter;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +import java.util.concurrent.Callable;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.ThreadPoolExecutor;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.IsolatedScanner;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.impl.ServerClient;
 +import org.apache.accumulo.core.client.impl.Tables;
 +import org.apache.accumulo.core.client.impl.thrift.ClientService;
 +import org.apache.accumulo.core.client.impl.thrift.ClientService.Client;
 +import org.apache.accumulo.core.client.impl.thrift.TableOperation;
 +import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
 +import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.conf.SiteConfiguration;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.file.FileOperations;
 +import org.apache.accumulo.core.master.state.tables.TableState;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.util.Pair;
 +import org.apache.accumulo.core.util.SimpleThreadPool;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.fate.Repo;
 +import org.apache.accumulo.master.Master;
 +import org.apache.accumulo.server.ServerConstants;
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +import org.apache.accumulo.server.conf.ServerConfiguration;
 +import org.apache.accumulo.server.fs.FileRef;
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
 +import org.apache.accumulo.server.master.state.TServerInstance;
 +import org.apache.accumulo.server.security.SystemCredentials;
 +import org.apache.accumulo.server.tablets.UniqueNameAllocator;
 +import org.apache.accumulo.server.util.MetadataTableUtil;
 +import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 +import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
 +import org.apache.accumulo.trace.instrument.TraceExecutorService;
 +import org.apache.accumulo.trace.instrument.Tracer;
 +import org.apache.hadoop.fs.FSDataInputStream;
 +import org.apache.hadoop.fs.FSDataOutputStream;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.MapFile;
 +import org.apache.hadoop.io.Text;
 +import org.apache.log4j.Logger;
 +import org.apache.thrift.TException;
 +
 +/*
 + * Bulk import makes requests of tablet servers, and those requests can take a
 + * long time. Our communications to the tablet server may fail, so we won't know
 + * the status of the request. The master will repeat failed requests so now
 + * there are multiple requests to the tablet server. The tablet server will not
 + * execute the request multiple times, so long as the marker it wrote in the
 + * metadata table stays there. The master needs to know when all requests have
 + * finished so it can remove the markers. Did it start? Did it finish? We can see
 + * that *a* request completed by seeing the flag written into the metadata
 + * table, but we won't know if some other rogue thread is still waiting to start
 + * a thread and repeat the operation.
 + * 
 + * The master can ask the tablet server if it has any requests still running.
 + * Except the tablet server might have some thread about to start a request, but
 + * before it has made any bookkeeping about the request. To prevent problems
 + * like this, an Arbitrator is used. Before starting any new request, the tablet
 + * server checks the Arbitrator to see if the request is still valid.
 + * 
 + */
 +
 +public class BulkImport extends MasterRepo {
 +  public static final String FAILURES_TXT = "failures.txt";
 +
 +  private static final long serialVersionUID = 1L;
 +
 +  private static final Logger log = Logger.getLogger(BulkImport.class);
 +
 +  private String tableId;
 +  private String sourceDir;
 +  private String errorDir;
 +  private boolean setTime;
 +
 +  public BulkImport(String tableId, String sourceDir, String errorDir, boolean setTime) {
 +    this.tableId = tableId;
 +    this.sourceDir = sourceDir;
 +    this.errorDir = errorDir;
 +    this.setTime = setTime;
 +  }
 +
 +  @Override
 +  public long isReady(long tid, Master master) throws Exception {
 +    if (!Utils.getReadLock(tableId, tid).tryLock())
 +      return 100;
 +
 +    Instance instance = HdfsZooInstance.getInstance();
 +    Tables.clearCache(instance);
 +    if (Tables.getTableState(instance, tableId) == TableState.ONLINE) {
 +      long reserve1, reserve2;
 +      reserve1 = reserve2 = Utils.reserveHdfsDirectory(sourceDir, tid);
 +      if (reserve1 == 0)
 +        reserve2 = Utils.reserveHdfsDirectory(errorDir, tid);
 +      return reserve2;
 +    } else {
 +      throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.OFFLINE, null);
 +    }
 +  }
 +
 +  @Override
 +  //TODO Remove deprecation warning suppression when Hadoop1 support is dropped
 +  @SuppressWarnings("deprecation")
 +  public Repo<Master> call(long tid, Master master) throws Exception {
 +    log.debug(" tid " + tid + " sourceDir " + sourceDir);
 +
 +    Utils.getReadLock(tableId, tid).lock();
 +
 +    // check that the error directory exists and is empty
 +    VolumeManager fs = master.getFileSystem();
 +
 +    Path errorPath = new Path(errorDir);
 +    FileStatus errorStatus = null;
 +    try {
 +      errorStatus = fs.getFileStatus(errorPath);
 +    } catch (FileNotFoundException ex) {
 +      // ignored
 +    }
 +    if (errorStatus == null)
 +      throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY, errorDir
 +          + " does not exist");
 +    if (!errorStatus.isDir())
 +      throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY, errorDir
 +          + " is not a directory");
 +    if (fs.listStatus(errorPath).length != 0)
 +      throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY, errorDir
 +          + " is not empty");
 +
 +    ZooArbitrator.start(Constants.BULK_ARBITRATOR_TYPE, tid);
 +
 +    // move the files into the directory
 +    try {
 +      String bulkDir = prepareBulkImport(fs, sourceDir, tableId);
 +      log.debug(" tid " + tid + " bulkDir " + bulkDir);
 +      return new LoadFiles(tableId, sourceDir, bulkDir, errorDir, setTime);
 +    } catch (IOException ex) {
 +      log.error("error preparing the bulk import directory", ex);
 +      throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_INPUT_DIRECTORY, sourceDir + ": "
 +          + ex);
 +    }
 +  }
 +
 +  private Path createNewBulkDir(VolumeManager fs, String tableId) throws IOException {
 +    Path tempPath = fs.matchingFileSystem(new Path(sourceDir), ServerConstants.getTablesDirs());
 +    if (tempPath == null)
 +      throw new IllegalStateException(sourceDir + " is not in a known namespace");
 +
 +    String tableDir = tempPath.toString();
 +    if (tableDir == null)
 +      throw new IllegalStateException(sourceDir + " is not in a known namespace");
 +    Path directory = new Path(tableDir + "/" + tableId);
 +    fs.mkdirs(directory);
 +
 +    // only one should be able to create the lock file
 +    // the purpose of the lock file is to avoid a race
 +    // condition between the call to fs.exists() and
 +    // fs.mkdirs()... if only hadoop had a mkdir() function
 +    // that failed when the dir existed
 +
 +    UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
 +
 +    while (true) {
 +      Path newBulkDir = new Path(directory, Constants.BULK_PREFIX + namer.getNextName());
 +      if (fs.exists(newBulkDir)) // sanity check
 +        throw new IllegalStateException("Dir exist when it should not " + newBulkDir);
 +      if (fs.mkdirs(newBulkDir))
 +        return newBulkDir;
 +      log.warn("Failed to create " + newBulkDir + " for unknown reason");
 +
 +      UtilWaitThread.sleep(3000);
 +    }
 +  }
 +
 +  //TODO Remove deprecation warning suppression when Hadoop1 support is dropped
 +  @SuppressWarnings("deprecation")
 +  private String prepareBulkImport(VolumeManager fs, String dir, String tableId) throws IOException {
 +    Path bulkDir = createNewBulkDir(fs, tableId);
 +
 +    MetadataTableUtil.addBulkLoadInProgressFlag("/" + bulkDir.getParent().getName() + "/" + bulkDir.getName());
 +
 +    Path dirPath = new Path(dir);
 +    FileStatus[] mapFiles = fs.listStatus(dirPath);
 +
 +    UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
 +
 +    for (FileStatus fileStatus : mapFiles) {
 +      String sa[] = fileStatus.getPath().getName().split("\\.");
 +      String extension = "";
 +      if (sa.length > 1) {
 +        extension = sa[sa.length - 1];
 +
 +        if (!FileOperations.getValidExtensions().contains(extension)) {
 +          log.warn(fileStatus.getPath() + " does not have a valid extension, ignoring");
 +          continue;
 +        }
 +      } else {
 +        // assume it is a map file
 +        extension = Constants.MAPFILE_EXTENSION;
 +      }
 +
 +      if (extension.equals(Constants.MAPFILE_EXTENSION)) {
 +        if (!fileStatus.isDir()) {
 +          log.warn(fileStatus.getPath() + " is not a map file, ignoring");
 +          continue;
 +        }
 +
 +        if (fileStatus.getPath().getName().equals("_logs")) {
 +          log.info(fileStatus.getPath() + " is probably a log directory from a map/reduce task, skipping");
 +          continue;
 +        }
 +        try {
 +          FileStatus dataStatus = fs.getFileStatus(new Path(fileStatus.getPath(), MapFile.DATA_FILE_NAME));
 +          if (dataStatus.isDir()) {
 +            log.warn(fileStatus.getPath() + " is not a map file, ignoring");
 +            continue;
 +          }
 +        } catch (FileNotFoundException fnfe) {
 +          log.warn(fileStatus.getPath() + " is not a map file, ignoring");
 +          continue;
 +        }
 +      }
 +
 +      String newName = "I" + namer.getNextName() + "." + extension;
 +      Path newPath = new Path(bulkDir, newName);
 +      try {
 +        fs.rename(fileStatus.getPath(), newPath);
 +        log.debug("Moved " + fileStatus.getPath() + " to " + newPath);
 +      } catch (IOException E1) {
 +        log.error("Could not move: " + fileStatus.getPath().toString() + " " + E1.getMessage());
 +      }
 +    }
 +    return bulkDir.toString();
 +  }
 +
 +  @Override
 +  public void undo(long tid, Master environment) throws Exception {
 +    // unreserve source/error directories
 +    Utils.unreserveHdfsDirectory(sourceDir, tid);
 +    Utils.unreserveHdfsDirectory(errorDir, tid);
 +    Utils.getReadLock(tableId, tid).unlock();
 +  }
 +}
 +
 +class CleanUpBulkImport extends MasterRepo {
 +
 +  private static final long serialVersionUID = 1L;
 +
 +  private static final Logger log = Logger.getLogger(CleanUpBulkImport.class);
 +
 +  private String tableId;
 +  private String source;
 +  private String bulk;
 +  private String error;
 +
 +  public CleanUpBulkImport(String tableId, String source, String bulk, String error) {
 +    this.tableId = tableId;
 +    this.source = source;
 +    this.bulk = bulk;
 +    this.error = error;
 +  }
 +
 +  @Override
 +  public Repo<Master> call(long tid, Master master) throws Exception {
 +    log.debug("removing the bulk processing flag file in " + bulk);
 +    Path bulkDir = new Path(bulk);
 +    MetadataTableUtil.removeBulkLoadInProgressFlag("/" + bulkDir.getParent().getName() + "/" + bulkDir.getName());
 +    MetadataTableUtil.addDeleteEntry(tableId, bulkDir.toString());
 +    log.debug("removing the metadata table markers for loaded files");
 +    Connector conn = master.getConnector();
 +    MetadataTableUtil.removeBulkLoadEntries(conn, tableId, tid);
 +    log.debug("releasing HDFS reservations for " + source + " and " + error);
 +    Utils.unreserveHdfsDirectory(source, tid);
 +    Utils.unreserveHdfsDirectory(error, tid);
 +    Utils.getReadLock(tableId, tid).unlock();
 +    log.debug("completing bulk import transaction " + tid);
 +    ZooArbitrator.cleanup(Constants.BULK_ARBITRATOR_TYPE, tid);
 +    return null;
 +  }
 +}
 +
 +class CompleteBulkImport extends MasterRepo {
 +
 +  private static final long serialVersionUID = 1L;
 +
 +  private String tableId;
 +  private String source;
 +  private String bulk;
 +  private String error;
 +
 +  public CompleteBulkImport(String tableId, String source, String bulk, String error) {
 +    this.tableId = tableId;
 +    this.source = source;
 +    this.bulk = bulk;
 +    this.error = error;
 +  }
 +
 +  @Override
 +  public Repo<Master> call(long tid, Master master) throws Exception {
 +    ZooArbitrator.stop(Constants.BULK_ARBITRATOR_TYPE, tid);
 +    return new CopyFailed(tableId, source, bulk, error);
 +  }
 +}
 +
 +class CopyFailed extends MasterRepo {
 +
 +  private static final long serialVersionUID = 1L;
 +
 +  private String tableId;
 +  private String source;
 +  private String bulk;
 +  private String error;
 +
 +  public CopyFailed(String tableId, String source, String bulk, String error) {
 +    this.tableId = tableId;
 +    this.source = source;
 +    this.bulk = bulk;
 +    this.error = error;
 +  }
 +
 +  @Override
 +  public long isReady(long tid, Master master) throws Exception {
 +    Set<TServerInstance> finished = new HashSet<TServerInstance>();
 +    Set<TServerInstance> running = master.onlineTabletServers();
 +    for (TServerInstance server : running) {
 +      try {
 +        TServerConnection client = master.getConnection(server);
 +        if (client != null && !client.isActive(tid))
 +          finished.add(server);
 +      } catch (TException ex) {
 +        log.info("Ignoring error trying to check on tid " + tid + " from server " + server + ": " + ex);
 +      }
 +    }
 +    if (finished.containsAll(running))
 +      return 0;
 +    return 500;
 +  }
 +
 +  @Override
 +  public Repo<Master> call(long tid, Master master) throws Exception {
 +    // This needs to execute after the arbiter is stopped
 +
 +    VolumeManager fs = master.getFileSystem();
 +
 +    if (!fs.exists(new Path(error, BulkImport.FAILURES_TXT)))
 +      return new CleanUpBulkImport(tableId, source, bulk, error);
 +
 +    HashMap<FileRef,String> failures = new HashMap<FileRef,String>();
 +    HashMap<FileRef,String> loadedFailures = new HashMap<FileRef,String>();
 +
 +    FSDataInputStream failFile = fs.open(new Path(error, BulkImport.FAILURES_TXT));
-     BufferedReader in = new BufferedReader(new InputStreamReader(failFile, Constants.UTF8));
++    BufferedReader in = new BufferedReader(new InputStreamReader(failFile, UTF_8));
 +    try {
 +      String line = null;
 +      while ((line = in.readLine()) != null) {
 +        Path path = new Path(line);
 +        if (!fs.exists(new Path(error, path.getName())))
 +          failures.put(new FileRef(line, path), line);
 +      }
 +    } finally {
 +      failFile.close();
 +    }
 +
 +    /*
 +     * I thought I could move files that have no file references in the table. However its possible a clone references a file. Therefore only move files that
 +     * have no loaded markers.
 +     */
 +
 +    // determine which failed files were loaded
 +    Connector conn = master.getConnector();
 +    Scanner mscanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY));
 +    mscanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange());
 +    mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME);
 +
 +    for (Entry<Key,Value> entry : mscanner) {
 +      if (Long.parseLong(entry.getValue().toString()) == tid) {
 +        FileRef loadedFile = new FileRef(fs, entry.getKey());
 +        String absPath = failures.remove(loadedFile);
 +        if (absPath != null) {
 +          loadedFailures.put(loadedFile, absPath);
 +        }
 +      }
 +    }
 +
 +    // move failed files that were not loaded
 +    for (String failure : failures.values()) {
 +      Path orig = new Path(failure);
 +      Path dest = new Path(error, orig.getName());
 +      fs.rename(orig, dest);
 +      log.debug("tid " + tid + " renamed " + orig + " to " + dest + ": import failed");
 +    }
 +
 +    if (loadedFailures.size() > 0) {
 +      DistributedWorkQueue bifCopyQueue = new DistributedWorkQueue(Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID()
 +          + Constants.ZBULK_FAILED_COPYQ);
 +
 +      HashSet<String> workIds = new HashSet<String>();
 +
 +      for (String failure : loadedFailures.values()) {
 +        Path orig = new Path(failure);
 +        Path dest = new Path(error, orig.getName());
 +
 +        if (fs.exists(dest))
 +          continue;
 +
-         bifCopyQueue.addWork(orig.getName(), (failure + "," + dest).getBytes(Constants.UTF8));
++        bifCopyQueue.addWork(orig.getName(), (failure + "," + dest).getBytes(UTF_8));
 +        workIds.add(orig.getName());
 +        log.debug("tid " + tid + " added to copyq: " + orig + " to " + dest + ": failed");
 +      }
 +
 +      bifCopyQueue.waitUntilDone(workIds);
 +    }
 +
 +    fs.deleteRecursively(new Path(error, BulkImport.FAILURES_TXT));
 +    return new CleanUpBulkImport(tableId, source, bulk, error);
 +  }
 +
 +}
 +
 +class LoadFiles extends MasterRepo {
 +
 +  private static final long serialVersionUID = 1L;
 +
 +  private static ExecutorService threadPool = null;
 +  private static final Logger log = Logger.getLogger(BulkImport.class);
 +
 +  private String tableId;
 +  private String source;
 +  private String bulk;
 +  private String errorDir;
 +  private boolean setTime;
 +
 +  public LoadFiles(String tableId, String source, String bulk, String errorDir, boolean setTime) {
 +    this.tableId = tableId;
 +    this.source = source;
 +    this.bulk = bulk;
 +    this.errorDir = errorDir;
 +    this.setTime = setTime;
 +  }
 +
 +  @Override
 +  public long isReady(long tid, Master master) throws Exception {
 +    if (master.onlineTabletServers().size() == 0)
 +      return 500;
 +    return 0;
 +  }
 +
 +  private static synchronized ExecutorService getThreadPool(Master master) {
 +    if (threadPool == null) {
 +      int threadPoolSize = master.getSystemConfiguration().getCount(Property.MASTER_BULK_THREADPOOL_SIZE);
 +      ThreadPoolExecutor pool = new SimpleThreadPool(threadPoolSize, "bulk import");
 +      pool.allowCoreThreadTimeOut(true);
 +      threadPool = new TraceExecutorService(pool);
 +    }
 +    return threadPool;
 +  }
 +
 +  @Override
 +  public Repo<Master> call(final long tid, final Master master) throws Exception {
 +    ExecutorService executor = getThreadPool(master);
 +    final SiteConfiguration conf = ServerConfiguration.getSiteConfiguration();
 +    VolumeManager fs = master.getFileSystem();
 +    List<FileStatus> files = new ArrayList<FileStatus>();
 +    for (FileStatus entry : fs.listStatus(new Path(bulk))) {
 +      files.add(entry);
 +    }
 +    log.debug("tid " + tid + " importing " + files.size() + " files");
 +
 +    Path writable = new Path(this.errorDir, ".iswritable");
 +    if (!fs.createNewFile(writable)) {
 +      // Maybe this is a re-try... clear the flag and try again
 +      fs.delete(writable);
 +      if (!fs.createNewFile(writable))
 +        throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY,
 +            "Unable to write to " + this.errorDir);
 +    }
 +    fs.delete(writable);
 +
 +    final Set<String> filesToLoad = Collections.synchronizedSet(new HashSet<String>());
 +    for (FileStatus f : files)
 +      filesToLoad.add(f.getPath().toString());
 +
 +    final int RETRIES = Math.max(1, conf.getCount(Property.MASTER_BULK_RETRIES));
 +    for (int attempt = 0; attempt < RETRIES && filesToLoad.size() > 0; attempt++) {
 +      List<Future<List<String>>> results = new ArrayList<Future<List<String>>>();
 +
 +      if (master.onlineTabletServers().size() == 0)
 +        log.warn("There are no tablet server to process bulk import, waiting (tid = " + tid + ")");
 +
 +      while (master.onlineTabletServers().size() == 0) {
 +        UtilWaitThread.sleep(500);
 +      }
 +
 +      // Use the threadpool to assign files one-at-a-time to the server
 +      final List<String> loaded = Collections.synchronizedList(new ArrayList<String>());
 +      for (final String file : filesToLoad) {
 +        results.add(executor.submit(new Callable<List<String>>() {
 +          @Override
 +          public List<String> call() {
 +            List<String> failures = new ArrayList<String>();
 +            ClientService.Client client = null;
 +            String server = null;
 +            try {
 +              // get a connection to a random tablet server, do not prefer cached connections because
 +              // this is running on the master and there are lots of connections to tablet servers
 +              // serving the metadata tablets
 +              long timeInMillis = master.getConfiguration().getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT);
 +              Pair<String,Client> pair = ServerClient.getConnection(master.getInstance(), false, timeInMillis);
 +              client = pair.getSecond();
 +              server = pair.getFirst();
 +              List<String> attempt = Collections.singletonList(file);
 +              log.debug("Asking " + pair.getFirst() + " to bulk import " + file);
 +              List<String> fail = client.bulkImportFiles(Tracer.traceInfo(), SystemCredentials.get().toThrift(master.getInstance()), tid, tableId, attempt,
 +                  errorDir, setTime);
 +              if (fail.isEmpty()) {
 +                loaded.add(file);
 +              } else {
 +                failures.addAll(fail);
 +              }
 +            } catch (Exception ex) {
 +              log.error("rpc failed server:" + server + ", tid:" + tid + " " + ex);
 +            } finally {
 +              ServerClient.close(client);
 +            }
 +            return failures;
 +          }
 +        }));
 +      }
 +      Set<String> failures = new HashSet<String>();
 +      for (Future<List<String>> f : results)
 +        failures.addAll(f.get());
 +      filesToLoad.removeAll(loaded);
 +      if (filesToLoad.size() > 0) {
 +        log.debug("tid " + tid + " attempt " + (attempt + 1) + " " + sampleList(filesToLoad, 10) + " failed");
 +        UtilWaitThread.sleep(100);
 +      }
 +    }
 +
 +    FSDataOutputStream failFile = fs.create(new Path(errorDir, BulkImport.FAILURES_TXT), true);
-     BufferedWriter out = new BufferedWriter(new OutputStreamWriter(failFile, Constants.UTF8));
++    BufferedWriter out = new BufferedWriter(new OutputStreamWriter(failFile, UTF_8));
 +    try {
 +      for (String f : filesToLoad) {
 +        out.write(f);
 +        out.write("\n");
 +      }
 +    } finally {
 +      out.close();
 +    }
 +
 +    // return the next step, which will perform cleanup
 +    return new CompleteBulkImport(tableId, source, bulk, errorDir);
 +  }
 +
 +  static String sampleList(Collection<?> potentiallyLongList, int max) {
 +    StringBuffer result = new StringBuffer();
 +    result.append("[");
 +    int i = 0;
 +    for (Object obj : potentiallyLongList) {
 +      result.append(obj);
 +      if (i >= max) {
 +        result.append("...");
 +        break;
 +      } else {
 +        result.append(", ");
 +      }
 +      i++;
 +    }
 +    if (i < max)
 +      result.delete(result.length() - 2, result.length());
 +    result.append("]");
 +    return result.toString();
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java
----------------------------------------------------------------------
diff --cc server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java
index ebceaa0,0000000..fd2441c
mode 100644,000000..100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java
@@@ -1,105 -1,0 +1,107 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.master.tableOps;
 +
++import static com.google.common.base.Charsets.UTF_8;
++
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.impl.Tables;
 +import org.apache.accumulo.core.client.impl.thrift.TableOperation;
 +import org.apache.accumulo.fate.Repo;
 +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter.Mutator;
 +import org.apache.accumulo.master.Master;
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 +
 +class FinishCancelCompaction extends MasterRepo {
 +  private static final long serialVersionUID = 1L;
 +  private String tableId;
 +
 +  public FinishCancelCompaction(String tableId) {
 +    this.tableId = tableId;
 +  }
 +
 +  @Override
 +  public Repo<Master> call(long tid, Master environment) throws Exception {
 +    Utils.getReadLock(tableId, tid).unlock();
 +    return null;
 +  }
 +
 +  @Override
 +  public void undo(long tid, Master environment) throws Exception {
 +
 +  }
 +}
 +
 +/**
 + *
 + */
 +public class CancelCompactions extends MasterRepo {
 +
 +  private static final long serialVersionUID = 1L;
 +  private String tableId;
 +  private String namespaceId;
 +
 +  public CancelCompactions(String tableId) {
 +    this.tableId = tableId;
 +    Instance inst = HdfsZooInstance.getInstance();
 +    this.namespaceId = Tables.getNamespaceId(inst, tableId);
 +  }
 +
 +  @Override
 +  public long isReady(long tid, Master environment) throws Exception {
 +    return Utils.reserveNamespace(namespaceId, tid, false, true, TableOperation.COMPACT_CANCEL)
 +        + Utils.reserveTable(tableId, tid, false, true, TableOperation.COMPACT_CANCEL);
 +  }
 +
 +  @Override
 +  public Repo<Master> call(long tid, Master environment) throws Exception {
 +    String zCompactID = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_COMPACT_ID;
 +    String zCancelID = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZTABLES + "/" + tableId
 +        + Constants.ZTABLE_COMPACT_CANCEL_ID;
 +
 +    IZooReaderWriter zoo = ZooReaderWriter.getInstance();
 +
 +    byte[] currentValue = zoo.getData(zCompactID, null);
 +
-     String cvs = new String(currentValue, Constants.UTF8);
++    String cvs = new String(currentValue, UTF_8);
 +    String[] tokens = cvs.split(",");
 +    final long flushID = Long.parseLong(tokens[0]);
 +
 +    zoo.mutate(zCancelID, null, null, new Mutator() {
 +      @Override
 +      public byte[] mutate(byte[] currentValue) throws Exception {
-         long cid = Long.parseLong(new String(currentValue, Constants.UTF8));
++        long cid = Long.parseLong(new String(currentValue, UTF_8));
 +
 +        if (cid < flushID)
-           return Long.toString(flushID).getBytes(Constants.UTF8);
++          return Long.toString(flushID).getBytes(UTF_8);
 +        else
-           return Long.toString(cid).getBytes(Constants.UTF8);
++          return Long.toString(cid).getBytes(UTF_8);
 +      }
 +    });
 +
 +    return new FinishCancelCompaction(tableId);
 +  }
 +
 +  @Override
 +  public void undo(long tid, Master environment) throws Exception {
 +    Utils.unreserveNamespace(namespaceId, tid, false);
 +    Utils.unreserveTable(tableId, tid, false);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java
----------------------------------------------------------------------
diff --cc server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java
index a643ccb,0000000..fbaa70d
mode 100644,000000..100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java
@@@ -1,400 -1,0 +1,402 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.master.tableOps;
 +
++import static com.google.common.base.Charsets.UTF_8;
++
 +import java.io.DataInput;
 +import java.io.DataOutput;
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map.Entry;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.IsolatedScanner;
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.RowIterator;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.impl.Tables;
 +import org.apache.accumulo.core.client.impl.thrift.TableOperation;
 +import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
 +import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.master.state.tables.TableState;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.RootTable;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.util.MapCounter;
 +import org.apache.accumulo.fate.Repo;
 +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter.Mutator;
 +import org.apache.accumulo.master.Master;
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
 +import org.apache.accumulo.server.master.state.TServerInstance;
 +import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 +import org.apache.commons.codec.binary.Hex;
 +import org.apache.hadoop.io.Text;
 +import org.apache.hadoop.io.Writable;
 +import org.apache.hadoop.io.WritableUtils;
 +import org.apache.log4j.Logger;
 +import org.apache.thrift.TException;
 +import org.apache.zookeeper.KeeperException.NoNodeException;
 +
 +class CompactionDriver extends MasterRepo {
 +
 +  private static final long serialVersionUID = 1L;
 +
 +  private long compactId;
 +  private String tableId;
 +  private byte[] startRow;
 +  private byte[] endRow;
 +  private String namespaceId;
 +
 +  public CompactionDriver(long compactId, String tableId, byte[] startRow, byte[] endRow) {
 +
 +    this.compactId = compactId;
 +    this.tableId = tableId;
 +    this.startRow = startRow;
 +    this.endRow = endRow;
 +    Instance inst = HdfsZooInstance.getInstance();
 +    this.namespaceId = Tables.getNamespaceId(inst, tableId);
 +  }
 +
 +  @Override
 +  public long isReady(long tid, Master master) throws Exception {
 +
 +    String zCancelID = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZTABLES + "/" + tableId
 +        + Constants.ZTABLE_COMPACT_CANCEL_ID;
 +
 +    IZooReaderWriter zoo = ZooReaderWriter.getInstance();
 +
 +    if (Long.parseLong(new String(zoo.getData(zCancelID, null))) >= compactId) {
 +      // compaction was canceled
 +      throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.OTHER, "Compaction canceled");
 +    }
 +
 +    MapCounter<TServerInstance> serversToFlush = new MapCounter<TServerInstance>();
 +    Connector conn = master.getConnector();
 +
 +    Scanner scanner;
 +
 +    if (tableId.equals(MetadataTable.ID)) {
 +      scanner = new IsolatedScanner(conn.createScanner(RootTable.NAME, Authorizations.EMPTY));
 +      scanner.setRange(MetadataSchema.TabletsSection.getRange());
 +    } else {
 +      scanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY));
 +      Range range = new KeyExtent(new Text(tableId), null, startRow == null ? null : new Text(startRow)).toMetadataRange();
 +      scanner.setRange(range);
 +    }
 +
 +    TabletsSection.ServerColumnFamily.COMPACT_COLUMN.fetch(scanner);
 +    TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner);
 +    scanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME);
 +
 +    long t1 = System.currentTimeMillis();
 +    RowIterator ri = new RowIterator(scanner);
 +
 +    int tabletsToWaitFor = 0;
 +    int tabletCount = 0;
 +
 +    while (ri.hasNext()) {
 +      Iterator<Entry<Key,Value>> row = ri.next();
 +      long tabletCompactID = -1;
 +
 +      TServerInstance server = null;
 +
 +      Entry<Key,Value> entry = null;
 +      while (row.hasNext()) {
 +        entry = row.next();
 +        Key key = entry.getKey();
 +
 +        if (TabletsSection.ServerColumnFamily.COMPACT_COLUMN.equals(key.getColumnFamily(), key.getColumnQualifier()))
 +          tabletCompactID = Long.parseLong(entry.getValue().toString());
 +
 +        if (TabletsSection.CurrentLocationColumnFamily.NAME.equals(key.getColumnFamily()))
 +          server = new TServerInstance(entry.getValue(), key.getColumnQualifier());
 +      }
 +
 +      if (tabletCompactID < compactId) {
 +        tabletsToWaitFor++;
 +        if (server != null)
 +          serversToFlush.increment(server, 1);
 +      }
 +
 +      tabletCount++;
 +
 +      Text tabletEndRow = new KeyExtent(entry.getKey().getRow(), (Text) null).getEndRow();
 +      if (tabletEndRow == null || (endRow != null && tabletEndRow.compareTo(new Text(endRow)) >= 0))
 +        break;
 +    }
 +
 +    long scanTime = System.currentTimeMillis() - t1;
 +
 +    Instance instance = master.getInstance();
 +    Tables.clearCache(instance);
 +    if (tabletCount == 0 && !Tables.exists(instance, tableId))
 +      throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.NOTFOUND, null);
 +
 +    if (serversToFlush.size() == 0 && Tables.getTableState(instance, tableId) == TableState.OFFLINE)
 +      throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.OFFLINE, null);
 +
 +    if (tabletsToWaitFor == 0)
 +      return 0;
 +
 +    for (TServerInstance tsi : serversToFlush.keySet()) {
 +      try {
 +        final TServerConnection server = master.getConnection(tsi);
 +        if (server != null)
 +          server.compact(master.getMasterLock(), tableId, startRow, endRow);
 +      } catch (TException ex) {
 +        Logger.getLogger(CompactionDriver.class).error(ex.toString());
 +      }
 +    }
 +
 +    long sleepTime = 500;
 +
 +    if (serversToFlush.size() > 0)
 +      sleepTime = Collections.max(serversToFlush.values()) * sleepTime; // make wait time depend on the server with the most to
 +                                                                        // compact
 +
 +    sleepTime = Math.max(2 * scanTime, sleepTime);
 +
 +    sleepTime = Math.min(sleepTime, 30000);
 +
 +    return sleepTime;
 +  }
 +
 +  @Override
 +  public Repo<Master> call(long tid, Master environment) throws Exception {
 +    CompactRange.removeIterators(tid, tableId);
 +    Utils.getReadLock(tableId, tid).unlock();
 +    Utils.getReadLock(namespaceId, tid).unlock();
 +    return null;
 +  }
 +
 +  @Override
 +  public void undo(long tid, Master environment) throws Exception {
 +
 +  }
 +
 +}
 +
 +public class CompactRange extends MasterRepo {
 +
 +  private static final long serialVersionUID = 1L;
 +  private String tableId;
 +  private byte[] startRow;
 +  private byte[] endRow;
 +  private byte[] iterators;
 +  private String namespaceId;
 +
 +  public static class CompactionIterators implements Writable {
 +    byte[] startRow;
 +    byte[] endRow;
 +    List<IteratorSetting> iterators;
 +
 +    public CompactionIterators(byte[] startRow, byte[] endRow, List<IteratorSetting> iterators) {
 +      this.startRow = startRow;
 +      this.endRow = endRow;
 +      this.iterators = iterators;
 +    }
 +
 +    public CompactionIterators() {
 +      startRow = null;
 +      endRow = null;
 +      iterators = Collections.emptyList();
 +    }
 +
 +    @Override
 +    public void write(DataOutput out) throws IOException {
 +      out.writeBoolean(startRow != null);
 +      if (startRow != null) {
 +        out.writeInt(startRow.length);
 +        out.write(startRow);
 +      }
 +
 +      out.writeBoolean(endRow != null);
 +      if (endRow != null) {
 +        out.writeInt(endRow.length);
 +        out.write(endRow);
 +      }
 +
 +      out.writeInt(iterators.size());
 +      for (IteratorSetting is : iterators) {
 +        is.write(out);
 +      }
 +    }
 +
 +    @Override
 +    public void readFields(DataInput in) throws IOException {
 +      if (in.readBoolean()) {
 +        startRow = new byte[in.readInt()];
 +        in.readFully(startRow);
 +      } else {
 +        startRow = null;
 +      }
 +
 +      if (in.readBoolean()) {
 +        endRow = new byte[in.readInt()];
 +        in.readFully(endRow);
 +      } else {
 +        endRow = null;
 +      }
 +
 +      int num = in.readInt();
 +      iterators = new ArrayList<IteratorSetting>(num);
 +
 +      for (int i = 0; i < num; i++) {
 +        iterators.add(new IteratorSetting(in));
 +      }
 +    }
 +
 +    public Text getEndRow() {
 +      if (endRow == null)
 +        return null;
 +      return new Text(endRow);
 +    }
 +
 +    public Text getStartRow() {
 +      if (startRow == null)
 +        return null;
 +      return new Text(startRow);
 +    }
 +
 +    public List<IteratorSetting> getIterators() {
 +      return iterators;
 +    }
 +  }
 +
 +  public CompactRange(String tableId, byte[] startRow, byte[] endRow, List<IteratorSetting> iterators) throws ThriftTableOperationException {
 +    this.tableId = tableId;
 +    this.startRow = startRow.length == 0 ? null : startRow;
 +    this.endRow = endRow.length == 0 ? null : endRow;
 +    Instance inst = HdfsZooInstance.getInstance();
 +    this.namespaceId = Tables.getNamespaceId(inst, tableId);
 +
 +    if (iterators.size() > 0) {
 +      this.iterators = WritableUtils.toByteArray(new CompactionIterators(this.startRow, this.endRow, iterators));
 +    } else {
 +      iterators = null;
 +    }
 +
 +    if (this.startRow != null && this.endRow != null && new Text(startRow).compareTo(new Text(endRow)) >= 0)
 +      throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.BAD_RANGE,
 +          "start row must be less than end row");
 +  }
 +
 +  @Override
 +  public long isReady(long tid, Master environment) throws Exception {
 +    return Utils.reserveNamespace(namespaceId, tid, false, true, TableOperation.COMPACT)
 +        + Utils.reserveTable(tableId, tid, false, true, TableOperation.COMPACT);
 +  }
 +
 +  @Override
 +  public Repo<Master> call(final long tid, Master environment) throws Exception {
 +    String zTablePath = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_COMPACT_ID;
 +
 +    IZooReaderWriter zoo = ZooReaderWriter.getInstance();
 +    byte[] cid;
 +    try {
 +      cid = zoo.mutate(zTablePath, null, null, new Mutator() {
 +        @Override
 +        public byte[] mutate(byte[] currentValue) throws Exception {
-           String cvs = new String(currentValue, Constants.UTF8);
++          String cvs = new String(currentValue, UTF_8);
 +          String[] tokens = cvs.split(",");
 +          long flushID = Long.parseLong(tokens[0]);
 +          flushID++;
 +
 +          String txidString = String.format("%016x", tid);
 +
 +          for (int i = 1; i < tokens.length; i++) {
 +            if (tokens[i].startsWith(txidString))
 +              continue; // skip self
 +
 +            throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.OTHER,
 +                "Another compaction with iterators is running");
 +          }
 +
 +          StringBuilder encodedIterators = new StringBuilder();
 +
 +          if (iterators != null) {
 +            Hex hex = new Hex();
 +            encodedIterators.append(",");
 +            encodedIterators.append(txidString);
 +            encodedIterators.append("=");
-             encodedIterators.append(new String(hex.encode(iterators), Constants.UTF8));
++            encodedIterators.append(new String(hex.encode(iterators), UTF_8));
 +          }
 +
-           return (Long.toString(flushID) + encodedIterators).getBytes(Constants.UTF8);
++          return (Long.toString(flushID) + encodedIterators).getBytes(UTF_8);
 +        }
 +      });
 +
-       return new CompactionDriver(Long.parseLong(new String(cid, Constants.UTF8).split(",")[0]), tableId, startRow, endRow);
++      return new CompactionDriver(Long.parseLong(new String(cid, UTF_8).split(",")[0]), tableId, startRow, endRow);
 +    } catch (NoNodeException nne) {
 +      throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.NOTFOUND, null);
 +    }
 +
 +  }
 +
 +  static void removeIterators(final long txid, String tableId) throws Exception {
 +    String zTablePath = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_COMPACT_ID;
 +
 +    IZooReaderWriter zoo = ZooReaderWriter.getInstance();
 +
 +    zoo.mutate(zTablePath, null, null, new Mutator() {
 +      @Override
 +      public byte[] mutate(byte[] currentValue) throws Exception {
-         String cvs = new String(currentValue, Constants.UTF8);
++        String cvs = new String(currentValue, UTF_8);
 +        String[] tokens = cvs.split(",");
 +        long flushID = Long.parseLong(tokens[0]);
 +
 +        String txidString = String.format("%016x", txid);
 +
 +        StringBuilder encodedIterators = new StringBuilder();
 +        for (int i = 1; i < tokens.length; i++) {
 +          if (tokens[i].startsWith(txidString))
 +            continue;
 +          encodedIterators.append(",");
 +          encodedIterators.append(tokens[i]);
 +        }
 +
-         return (Long.toString(flushID) + encodedIterators).getBytes(Constants.UTF8);
++        return (Long.toString(flushID) + encodedIterators).getBytes(UTF_8);
 +      }
 +    });
 +
 +  }
 +
 +  @Override
 +  public void undo(long tid, Master environment) throws Exception {
 +    try {
 +      removeIterators(tid, tableId);
 +    } finally {
 +      Utils.unreserveNamespace(namespaceId, tid, false);
 +      Utils.unreserveTable(tableId, tid, false);
 +    }
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/server/master/src/main/java/org/apache/accumulo/master/tableOps/ExportTable.java
----------------------------------------------------------------------
diff --cc server/master/src/main/java/org/apache/accumulo/master/tableOps/ExportTable.java
index eeb9b16,0000000..037455c
mode 100644,000000..100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/ExportTable.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/ExportTable.java
@@@ -1,316 -1,0 +1,318 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.master.tableOps;
 +
++import static com.google.common.base.Charsets.UTF_8;
++
 +import java.io.BufferedOutputStream;
 +import java.io.BufferedWriter;
 +import java.io.DataOutputStream;
 +import java.io.IOException;
 +import java.io.OutputStreamWriter;
 +import java.io.Serializable;
 +import java.util.HashMap;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.zip.ZipEntry;
 +import java.util.zip.ZipOutputStream;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.impl.Tables;
 +import org.apache.accumulo.core.client.impl.thrift.TableOperation;
 +import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
 +import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.DefaultConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.master.state.tables.TableState;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.fate.Repo;
 +import org.apache.accumulo.master.Master;
 +import org.apache.accumulo.server.ServerConstants;
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +import org.apache.accumulo.server.conf.ServerConfiguration;
 +import org.apache.accumulo.server.conf.TableConfiguration;
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.hadoop.fs.FSDataOutputStream;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.Text;
 +
 +class ExportInfo implements Serializable {
 +  
 +  private static final long serialVersionUID = 1L;
 +  
 +  public String tableName;
 +  public String tableID;
 +  public String exportDir;
 +  public String namespaceID;
 +}
 +
 +class WriteExportFiles extends MasterRepo {
 +  
 +  private static final long serialVersionUID = 1L;
 +  private final ExportInfo tableInfo;
 +  
 +  WriteExportFiles(ExportInfo tableInfo) {
 +    this.tableInfo = tableInfo;
 +  }
 +  
 +  private void checkOffline(Connector conn) throws Exception {
 +    if (Tables.getTableState(conn.getInstance(), tableInfo.tableID) != TableState.OFFLINE) {
 +      Tables.clearCache(conn.getInstance());
 +      if (Tables.getTableState(conn.getInstance(), tableInfo.tableID) != TableState.OFFLINE) {
 +        throw new ThriftTableOperationException(tableInfo.tableID, tableInfo.tableName, TableOperation.EXPORT, TableOperationExceptionType.OTHER,
 +            "Table is not offline");
 +      }
 +    }
 +  }
 +  
 +  @Override
 +  public long isReady(long tid, Master master) throws Exception {
 +    
 +    long reserved = Utils.reserveNamespace(tableInfo.namespaceID, tid, false, true, TableOperation.EXPORT)
 +        + Utils.reserveTable(tableInfo.tableID, tid, false, true, TableOperation.EXPORT);
 +    if (reserved > 0)
 +      return reserved;
 +    
 +    Connector conn = master.getConnector();
 +    
 +    checkOffline(conn);
 +    
 +    Scanner metaScanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
 +    metaScanner.setRange(new KeyExtent(new Text(tableInfo.tableID), null, null).toMetadataRange());
 +    
 +    // scan for locations
 +    metaScanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME);
 +    metaScanner.fetchColumnFamily(TabletsSection.FutureLocationColumnFamily.NAME);
 +    
 +    if (metaScanner.iterator().hasNext()) {
 +      return 500;
 +    }
 +    
 +    // use the same range to check for walogs that we used to check for hosted (or future hosted) tablets
 +    // this is done as a separate scan after we check for locations, because walogs are okay only if there is no location
 +    metaScanner.clearColumns();
 +    metaScanner.fetchColumnFamily(LogColumnFamily.NAME);
 +    
 +    if (metaScanner.iterator().hasNext()) {
 +      throw new ThriftTableOperationException(tableInfo.tableID, tableInfo.tableName, TableOperation.EXPORT, TableOperationExceptionType.OTHER,
 +          "Write ahead logs found for table");
 +    }
 +    
 +    return 0;
 +  }
 +  
 +  @Override
 +  public Repo<Master> call(long tid, Master master) throws Exception {
 +    Connector conn = master.getConnector();
 +    
 +    try {
 +      exportTable(master.getFileSystem(), conn, tableInfo.tableName, tableInfo.tableID, tableInfo.exportDir);
 +    } catch (IOException ioe) {
 +      throw new ThriftTableOperationException(tableInfo.tableID, tableInfo.tableName, TableOperation.EXPORT, TableOperationExceptionType.OTHER,
 +          "Failed to create export files " + ioe.getMessage());
 +    }
 +    Utils.unreserveNamespace(tableInfo.namespaceID, tid, false);
 +    Utils.unreserveTable(tableInfo.tableID, tid, false);
 +    Utils.unreserveHdfsDirectory(new Path(tableInfo.exportDir).toString(), tid);
 +    return null;
 +  }
 +  
 +  @Override
 +  public void undo(long tid, Master env) throws Exception {
 +    Utils.unreserveNamespace(tableInfo.namespaceID, tid, false);
 +    Utils.unreserveTable(tableInfo.tableID, tid, false);
 +  }
 +  
 +  public static void exportTable(VolumeManager fs, Connector conn, String tableName, String tableID, String exportDir) throws Exception {
 +    
 +    fs.mkdirs(new Path(exportDir));
 +    Path exportMetaFilePath = fs.getVolumeByPath(new Path(exportDir)).getFileSystem().makeQualified(new Path(exportDir, Constants.EXPORT_FILE));
 +    
 +    FSDataOutputStream fileOut = fs.create(exportMetaFilePath, false);
 +    ZipOutputStream zipOut = new ZipOutputStream(fileOut);
 +    BufferedOutputStream bufOut = new BufferedOutputStream(zipOut);
 +    DataOutputStream dataOut = new DataOutputStream(bufOut);
 +    
 +    try {
 +      
 +      zipOut.putNextEntry(new ZipEntry(Constants.EXPORT_INFO_FILE));
-       OutputStreamWriter osw = new OutputStreamWriter(dataOut, Constants.UTF8);
++      OutputStreamWriter osw = new OutputStreamWriter(dataOut, UTF_8);
 +      osw.append(ExportTable.EXPORT_VERSION_PROP + ":" + ExportTable.VERSION + "\n");
 +      osw.append("srcInstanceName:" + conn.getInstance().getInstanceName() + "\n");
 +      osw.append("srcInstanceID:" + conn.getInstance().getInstanceID() + "\n");
 +      osw.append("srcZookeepers:" + conn.getInstance().getZooKeepers() + "\n");
 +      osw.append("srcTableName:" + tableName + "\n");
 +      osw.append("srcTableID:" + tableID + "\n");
 +      osw.append(ExportTable.DATA_VERSION_PROP + ":" + ServerConstants.DATA_VERSION + "\n");
 +      osw.append("srcCodeVersion:" + Constants.VERSION + "\n");
 +      
 +      osw.flush();
 +      dataOut.flush();
 +      
 +      exportConfig(conn, tableID, zipOut, dataOut);
 +      dataOut.flush();
 +      
 +      Map<String,String> uniqueFiles = exportMetadata(fs, conn, tableID, zipOut, dataOut);
 +      
 +      dataOut.close();
 +      dataOut = null;
 +      
 +      createDistcpFile(fs, exportDir, exportMetaFilePath, uniqueFiles);
 +      
 +    } finally {
 +      if (dataOut != null)
 +        dataOut.close();
 +    }
 +  }
 +  
 +  private static void createDistcpFile(VolumeManager fs, String exportDir, Path exportMetaFilePath, Map<String,String> uniqueFiles) throws IOException {
-     BufferedWriter distcpOut = new BufferedWriter(new OutputStreamWriter(fs.create(new Path(exportDir, "distcp.txt"), false), Constants.UTF8));
++    BufferedWriter distcpOut = new BufferedWriter(new OutputStreamWriter(fs.create(new Path(exportDir, "distcp.txt"), false), UTF_8));
 +    
 +    try {
 +      for (String file : uniqueFiles.values()) {
 +        distcpOut.append(file);
 +        distcpOut.newLine();
 +      }
 +      
 +      distcpOut.append(exportMetaFilePath.toString());
 +      distcpOut.newLine();
 +      
 +      distcpOut.close();
 +      distcpOut = null;
 +      
 +    } finally {
 +      if (distcpOut != null)
 +        distcpOut.close();
 +    }
 +  }
 +  
 +  private static Map<String,String> exportMetadata(VolumeManager fs, Connector conn, String tableID, ZipOutputStream zipOut, DataOutputStream dataOut)
 +      throws IOException, TableNotFoundException {
 +    zipOut.putNextEntry(new ZipEntry(Constants.EXPORT_METADATA_FILE));
 +    
 +    Map<String,String> uniqueFiles = new HashMap<String,String>();
 +    
 +    Scanner metaScanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
 +    metaScanner.fetchColumnFamily(DataFileColumnFamily.NAME);
 +    TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(metaScanner);
 +    TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(metaScanner);
 +    metaScanner.setRange(new KeyExtent(new Text(tableID), null, null).toMetadataRange());
 +    
 +    for (Entry<Key,Value> entry : metaScanner) {
 +      entry.getKey().write(dataOut);
 +      entry.getValue().write(dataOut);
 +      
 +      if (entry.getKey().getColumnFamily().equals(DataFileColumnFamily.NAME)) {
 +        String path = fs.getFullPath(entry.getKey()).toString();
 +        String tokens[] = path.split("/");
 +        if (tokens.length < 1) {
 +          throw new RuntimeException("Illegal path " + path);
 +        }
 +        
 +        String filename = tokens[tokens.length - 1];
 +        
 +        String existingPath = uniqueFiles.get(filename);
 +        if (existingPath == null) {
 +          uniqueFiles.put(filename, path);
 +        } else if (!existingPath.equals(path)) {
 +          // make sure file names are unique, should only apply for tables with file names generated by Accumulo 1.3 and earlier
 +          throw new IOException("Cannot export table with nonunique file names " + filename + ". Major compact table.");
 +        }
 +        
 +      }
 +    }
 +    return uniqueFiles;
 +  }
 +  
 +  private static void exportConfig(Connector conn, String tableID, ZipOutputStream zipOut, DataOutputStream dataOut) throws AccumuloException,
 +      AccumuloSecurityException, TableNotFoundException, IOException {
 +    
 +    DefaultConfiguration defaultConfig = AccumuloConfiguration.getDefaultConfiguration();
 +    Map<String,String> siteConfig = conn.instanceOperations().getSiteConfiguration();
 +    Map<String,String> systemConfig = conn.instanceOperations().getSystemConfiguration();
 +    
 +    TableConfiguration tableConfig = ServerConfiguration.getTableConfiguration(conn.getInstance(), tableID);
 +    
-     OutputStreamWriter osw = new OutputStreamWriter(dataOut, Constants.UTF8);
++    OutputStreamWriter osw = new OutputStreamWriter(dataOut, UTF_8);
 +    
 +    // only put props that are different than defaults and higher level configurations
 +    zipOut.putNextEntry(new ZipEntry(Constants.EXPORT_TABLE_CONFIG_FILE));
 +    for (Entry<String,String> prop : tableConfig) {
 +      if (prop.getKey().startsWith(Property.TABLE_PREFIX.getKey())) {
 +        Property key = Property.getPropertyByKey(prop.getKey());
 +        
 +        if (key == null || !defaultConfig.get(key).equals(prop.getValue())) {
 +          if (!prop.getValue().equals(siteConfig.get(prop.getKey())) && !prop.getValue().equals(systemConfig.get(prop.getKey()))) {
 +            osw.append(prop.getKey() + "=" + prop.getValue() + "\n");
 +          }
 +        }
 +      }
 +    }
 +    
 +    osw.flush();
 +  }
 +}
 +
 +public class ExportTable extends MasterRepo {
 +  private static final long serialVersionUID = 1L;
 +  
 +  private final ExportInfo tableInfo;
 +  
 +  public ExportTable(String tableName, String tableId, String exportDir) {
 +    tableInfo = new ExportInfo();
 +    tableInfo.tableName = tableName;
 +    tableInfo.exportDir = exportDir;
 +    tableInfo.tableID = tableId;
 +    Instance inst = HdfsZooInstance.getInstance();
 +    tableInfo.namespaceID = Tables.getNamespaceId(inst, tableId);
 +  }
 +  
 +  @Override
 +  public long isReady(long tid, Master environment) throws Exception {
 +    return Utils.reserveHdfsDirectory(new Path(tableInfo.exportDir).toString(), tid);
 +  }
 +  
 +  @Override
 +  public Repo<Master> call(long tid, Master env) throws Exception {
 +    return new WriteExportFiles(tableInfo);
 +  }
 +  
 +  @Override
 +  public void undo(long tid, Master env) throws Exception {
 +    Utils.unreserveHdfsDirectory(new Path(tableInfo.exportDir).toString(), tid);
 +  }
 +  
 +  public static final int VERSION = 1;
 +  
 +  public static final String DATA_VERSION_PROP = "srcDataVersion";
 +  public static final String EXPORT_VERSION_PROP = "exportVersion";
 +  
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/server/master/src/main/java/org/apache/accumulo/master/tableOps/ImportTable.java
----------------------------------------------------------------------
diff --cc server/master/src/main/java/org/apache/accumulo/master/tableOps/ImportTable.java
index 8de6bce,0000000..05ec263
mode 100644,000000..100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/ImportTable.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/ImportTable.java
@@@ -1,653 -1,0 +1,655 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.master.tableOps;
 +
++import static com.google.common.base.Charsets.UTF_8;
++
 +import java.io.BufferedInputStream;
 +import java.io.BufferedReader;
 +import java.io.BufferedWriter;
 +import java.io.DataInputStream;
 +import java.io.IOException;
 +import java.io.InputStreamReader;
 +import java.io.OutputStreamWriter;
 +import java.io.Serializable;
 +import java.util.Arrays;
 +import java.util.HashMap;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.zip.ZipEntry;
 +import java.util.zip.ZipInputStream;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.impl.Namespaces;
 +import org.apache.accumulo.core.client.impl.TableOperationsImpl;
 +import org.apache.accumulo.core.client.impl.Tables;
 +import org.apache.accumulo.core.client.impl.thrift.TableOperation;
 +import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
 +import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 +import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.file.FileOperations;
 +import org.apache.accumulo.core.master.state.tables.TableState;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
 +import org.apache.accumulo.core.security.TablePermission;
 +import org.apache.accumulo.core.util.FastFormat;
 +import org.apache.accumulo.fate.Repo;
 +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 +import org.apache.accumulo.master.Master;
 +import org.apache.accumulo.server.ServerConstants;
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.accumulo.server.security.AuditedSecurityOperation;
 +import org.apache.accumulo.server.security.SecurityOperation;
 +import org.apache.accumulo.server.security.SystemCredentials;
 +import org.apache.accumulo.server.tables.TableManager;
 +import org.apache.accumulo.server.tablets.UniqueNameAllocator;
 +import org.apache.accumulo.server.util.MetadataTableUtil;
 +import org.apache.accumulo.server.util.TablePropUtil;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.Text;
 +import org.apache.log4j.Logger;
 +
 +/**
 + *
 + */
 +class ImportedTableInfo implements Serializable {
 +
 +  private static final long serialVersionUID = 1L;
 +
 +  public String exportDir;
 +  public String user;
 +  public String tableName;
 +  public String tableId;
 +  public String importDir;
 +  public String namespaceId;
 +}
 +
 +class FinishImportTable extends MasterRepo {
 +
 +  private static final long serialVersionUID = 1L;
 +
 +  private ImportedTableInfo tableInfo;
 +
 +  public FinishImportTable(ImportedTableInfo ti) {
 +    this.tableInfo = ti;
 +  }
 +
 +  @Override
 +  public long isReady(long tid, Master environment) throws Exception {
 +    return 0;
 +  }
 +
 +  @Override
 +  public Repo<Master> call(long tid, Master env) throws Exception {
 +
 +    env.getFileSystem().deleteRecursively(new Path(tableInfo.importDir, "mappings.txt"));
 +
 +    TableManager.getInstance().transitionTableState(tableInfo.tableId, TableState.ONLINE);
 +
 +    Utils.unreserveNamespace(tableInfo.namespaceId, tid, false);
 +    Utils.unreserveTable(tableInfo.tableId, tid, true);
 +
 +    Utils.unreserveHdfsDirectory(new Path(tableInfo.exportDir).toString(), tid);
 +
 +    env.getEventCoordinator().event("Imported table %s ", tableInfo.tableName);
 +
 +    Logger.getLogger(FinishImportTable.class).debug("Imported table " + tableInfo.tableId + " " + tableInfo.tableName);
 +
 +    return null;
 +  }
 +
 +  @Override
 +  public String getReturn() {
 +    return tableInfo.tableId;
 +  }
 +
 +  @Override
 +  public void undo(long tid, Master env) throws Exception {}
 +
 +}
 +
 +class MoveExportedFiles extends MasterRepo {
 +
 +  private static final long serialVersionUID = 1L;
 +
 +  private ImportedTableInfo tableInfo;
 +
 +  MoveExportedFiles(ImportedTableInfo ti) {
 +    this.tableInfo = ti;
 +  }
 +
 +  @Override
 +  public Repo<Master> call(long tid, Master master) throws Exception {
 +    try {
 +      VolumeManager fs = master.getFileSystem();
 +
 +      Map<String,String> fileNameMappings = PopulateMetadataTable.readMappingFile(fs, tableInfo);
 +
 +      for (String oldFileName : fileNameMappings.keySet()) {
 +        if (!fs.exists(new Path(tableInfo.exportDir, oldFileName))) {
 +          throw new ThriftTableOperationException(tableInfo.tableId, tableInfo.tableName, TableOperation.IMPORT, TableOperationExceptionType.OTHER,
 +              "File referenced by exported table does not exists " + oldFileName);
 +        }
 +      }
 +
 +      FileStatus[] files = fs.listStatus(new Path(tableInfo.exportDir));
 +
 +      for (FileStatus fileStatus : files) {
 +        String newName = fileNameMappings.get(fileStatus.getPath().getName());
 +
 +        if (newName != null)
 +          fs.rename(fileStatus.getPath(), new Path(tableInfo.importDir, newName));
 +      }
 +
 +      return new FinishImportTable(tableInfo);
 +    } catch (IOException ioe) {
 +      log.warn(ioe.getMessage(), ioe);
 +      throw new ThriftTableOperationException(tableInfo.tableId, tableInfo.tableName, TableOperation.IMPORT, TableOperationExceptionType.OTHER,
 +          "Error renaming files " + ioe.getMessage());
 +    }
 +  }
 +}
 +
 +class PopulateMetadataTable extends MasterRepo {
 +
 +  private static final long serialVersionUID = 1L;
 +
 +  private ImportedTableInfo tableInfo;
 +
 +  PopulateMetadataTable(ImportedTableInfo ti) {
 +    this.tableInfo = ti;
 +  }
 +
 +  static Map<String,String> readMappingFile(VolumeManager fs, ImportedTableInfo tableInfo) throws Exception {
-     BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(new Path(tableInfo.importDir, "mappings.txt")), Constants.UTF8));
++    BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(new Path(tableInfo.importDir, "mappings.txt")), UTF_8));
 +
 +    try {
 +      Map<String,String> map = new HashMap<String,String>();
 +
 +      String line = null;
 +      while ((line = in.readLine()) != null) {
 +        String sa[] = line.split(":", 2);
 +        map.put(sa[0], sa[1]);
 +      }
 +
 +      return map;
 +    } finally {
 +      in.close();
 +    }
 +
 +  }
 +
 +  @Override
 +  public Repo<Master> call(long tid, Master master) throws Exception {
 +
 +    Path path = new Path(tableInfo.exportDir, Constants.EXPORT_FILE);
 +
 +    BatchWriter mbw = null;
 +    ZipInputStream zis = null;
 +
 +    try {
 +      VolumeManager fs = master.getFileSystem();
 +
 +      mbw = master.getConnector().createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
 +
 +      zis = new ZipInputStream(fs.open(path));
 +
 +      Map<String,String> fileNameMappings = readMappingFile(fs, tableInfo);
 +
 +      log.info("importDir is " + tableInfo.importDir);
 +
 +      // This is a directory already prefixed with proper volume information e.g. hdfs://localhost:8020/path/to/accumulo/tables/...
 +      final String bulkDir = tableInfo.importDir;
 +
 +      final String[] tableDirs = ServerConstants.getTablesDirs();
 +
 +      ZipEntry zipEntry;
 +      while ((zipEntry = zis.getNextEntry()) != null) {
 +        if (zipEntry.getName().equals(Constants.EXPORT_METADATA_FILE)) {
 +          DataInputStream in = new DataInputStream(new BufferedInputStream(zis));
 +
 +          Key key = new Key();
 +          Value val = new Value();
 +
 +          Mutation m = null;
 +          Text currentRow = null;
 +          int dirCount = 0;
 +
 +          while (true) {
 +            key.readFields(in);
 +            val.readFields(in);
 +
 +            Text endRow = new KeyExtent(key.getRow(), (Text) null).getEndRow();
 +            Text metadataRow = new KeyExtent(new Text(tableInfo.tableId), endRow, null).getMetadataEntry();
 +
 +            Text cq;
 +
 +            if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) {
 +              String oldName = new Path(key.getColumnQualifier().toString()).getName();
 +              String newName = fileNameMappings.get(oldName);
 +
 +              if (newName == null) {
 +                throw new ThriftTableOperationException(tableInfo.tableId, tableInfo.tableName, TableOperation.IMPORT, TableOperationExceptionType.OTHER,
 +                    "File " + oldName + " does not exist in import dir");
 +              }
 +
 +              cq = new Text(bulkDir + "/" + newName);
 +            } else {
 +              cq = key.getColumnQualifier();
 +            }
 +
 +            if (m == null) {
 +              // Make a unique directory inside the table's dir. Cannot import multiple tables into one table, so don't need to use unique allocator
-               String tabletDir = new String(FastFormat.toZeroPaddedString(dirCount++, 8, 16, Constants.CLONE_PREFIX_BYTES), Constants.UTF8);
++              String tabletDir = new String(FastFormat.toZeroPaddedString(dirCount++, 8, 16, Constants.CLONE_PREFIX_BYTES), UTF_8);
 +
 +              // Build up a full hdfs://localhost:8020/accumulo/tables/$id/c-XXXXXXX
 +              String absolutePath = getClonedTabletDir(master, tableDirs, tabletDir);
 +
 +              m = new Mutation(metadataRow);
-               TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(m, new Value(absolutePath.getBytes(Constants.UTF8)));
++              TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(m, new Value(absolutePath.getBytes(UTF_8)));
 +              currentRow = metadataRow;
 +            }
 +
 +            if (!currentRow.equals(metadataRow)) {
 +              mbw.addMutation(m);
 +
 +              // Make a unique directory inside the table's dir. Cannot import multiple tables into one table, so don't need to use unique allocator
-               String tabletDir = new String(FastFormat.toZeroPaddedString(dirCount++, 8, 16, Constants.CLONE_PREFIX_BYTES), Constants.UTF8);
++              String tabletDir = new String(FastFormat.toZeroPaddedString(dirCount++, 8, 16, Constants.CLONE_PREFIX_BYTES), UTF_8);
 +
 +              // Build up a full hdfs://localhost:8020/accumulo/tables/$id/c-XXXXXXX
 +              String absolutePath = getClonedTabletDir(master, tableDirs, tabletDir);
 +
 +              m = new Mutation(metadataRow);
-               TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(m, new Value(absolutePath.getBytes(Constants.UTF8)));
++              TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(m, new Value(absolutePath.getBytes(UTF_8)));
 +            }
 +
 +            m.put(key.getColumnFamily(), cq, val);
 +
 +            if (endRow == null && TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) {
 +              mbw.addMutation(m);
 +              break; // its the last column in the last row
 +            }
 +          }
 +
 +          break;
 +        }
 +      }
 +
 +      return new MoveExportedFiles(tableInfo);
 +    } catch (IOException ioe) {
 +      log.warn(ioe.getMessage(), ioe);
 +      throw new ThriftTableOperationException(tableInfo.tableId, tableInfo.tableName, TableOperation.IMPORT, TableOperationExceptionType.OTHER,
 +          "Error reading " + path + " " + ioe.getMessage());
 +    } finally {
 +      if (zis != null) {
 +        try {
 +          zis.close();
 +        } catch (IOException ioe) {
 +          log.warn("Failed to close zip file ", ioe);
 +        }
 +      }
 +
 +      if (mbw != null) {
 +        mbw.close();
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Given options for tables (across multiple volumes), construct an absolute path using the unique name within the chosen volume
 +   *
 +   * @return An absolute, unique path for the imported table
 +   */
 +  protected String getClonedTabletDir(Master master, String[] tableDirs, String tabletDir) {
 +    // We can try to spread out the tablet dirs across all volumes
 +    String tableDir = master.getFileSystem().choose(tableDirs);
 +
 +    // Build up a full hdfs://localhost:8020/accumulo/tables/$id/c-XXXXXXX
 +    return tableDir + "/" + tableInfo.tableId + "/" + tabletDir;
 +  }
 +
 +  @Override
 +  public void undo(long tid, Master environment) throws Exception {
 +    MetadataTableUtil.deleteTable(tableInfo.tableId, false, SystemCredentials.get(), environment.getMasterLock());
 +  }
 +}
 +
 +class MapImportFileNames extends MasterRepo {
 +
 +  private static final long serialVersionUID = 1L;
 +
 +  private ImportedTableInfo tableInfo;
 +
 +  MapImportFileNames(ImportedTableInfo ti) {
 +    this.tableInfo = ti;
 +  }
 +
 +  @Override
 +  public Repo<Master> call(long tid, Master environment) throws Exception {
 +
 +    Path path = new Path(tableInfo.importDir, "mappings.txt");
 +
 +    BufferedWriter mappingsWriter = null;
 +
 +    try {
 +      VolumeManager fs = environment.getFileSystem();
 +
 +      fs.mkdirs(new Path(tableInfo.importDir));
 +
 +      FileStatus[] files = fs.listStatus(new Path(tableInfo.exportDir));
 +
 +      UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
 +
-       mappingsWriter = new BufferedWriter(new OutputStreamWriter(fs.create(path), Constants.UTF8));
++      mappingsWriter = new BufferedWriter(new OutputStreamWriter(fs.create(path), UTF_8));
 +
 +      for (FileStatus fileStatus : files) {
 +        String fileName = fileStatus.getPath().getName();
 +        log.info("filename " + fileStatus.getPath().toString());
 +        String sa[] = fileName.split("\\.");
 +        String extension = "";
 +        if (sa.length > 1) {
 +          extension = sa[sa.length - 1];
 +
 +          if (!FileOperations.getValidExtensions().contains(extension)) {
 +            continue;
 +          }
 +        } else {
 +          // assume it is a map file
 +          extension = Constants.MAPFILE_EXTENSION;
 +        }
 +
 +        String newName = "I" + namer.getNextName() + "." + extension;
 +
 +        mappingsWriter.append(fileName);
 +        mappingsWriter.append(':');
 +        mappingsWriter.append(newName);
 +        mappingsWriter.newLine();
 +      }
 +
 +      mappingsWriter.close();
 +      mappingsWriter = null;
 +
 +      return new PopulateMetadataTable(tableInfo);
 +    } catch (IOException ioe) {
 +      log.warn(ioe.getMessage(), ioe);
 +      throw new ThriftTableOperationException(tableInfo.tableId, tableInfo.tableName, TableOperation.IMPORT, TableOperationExceptionType.OTHER,
 +          "Error writing mapping file " + path + " " + ioe.getMessage());
 +    } finally {
 +      if (mappingsWriter != null)
 +        try {
 +          mappingsWriter.close();
 +        } catch (IOException ioe) {
 +          log.warn("Failed to close " + path, ioe);
 +        }
 +    }
 +  }
 +
 +  @Override
 +  public void undo(long tid, Master env) throws Exception {
 +    env.getFileSystem().deleteRecursively(new Path(tableInfo.importDir));
 +  }
 +}
 +
 +class CreateImportDir extends MasterRepo {
 +  private static final Logger log = Logger.getLogger(CreateImportDir.class);
 +  private static final long serialVersionUID = 1L;
 +
 +  private ImportedTableInfo tableInfo;
 +
 +  CreateImportDir(ImportedTableInfo ti) {
 +    this.tableInfo = ti;
 +  }
 +
 +  @Override
 +  public Repo<Master> call(long tid, Master master) throws Exception {
 +
 +    UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
 +
 +    Path exportDir = new Path(tableInfo.exportDir);
 +    String[] tableDirs = ServerConstants.getTablesDirs();
 +
 +    log.info("Looking for matching filesystem for " + exportDir + " from options " + Arrays.toString(tableDirs));
 +    Path base = master.getFileSystem().matchingFileSystem(exportDir, tableDirs);
 +    log.info("Chose base table directory of " + base);
 +    Path directory = new Path(base, tableInfo.tableId);
 +
 +    Path newBulkDir = new Path(directory, Constants.BULK_PREFIX + namer.getNextName());
 +
 +    tableInfo.importDir = newBulkDir.toString();
 +
 +    log.info("Using import dir: " + tableInfo.importDir);
 +
 +    return new MapImportFileNames(tableInfo);
 +  }
 +}
 +
 +class ImportPopulateZookeeper extends MasterRepo {
 +
 +  private static final long serialVersionUID = 1L;
 +
 +  private ImportedTableInfo tableInfo;
 +
 +  ImportPopulateZookeeper(ImportedTableInfo ti) {
 +    this.tableInfo = ti;
 +  }
 +
 +  @Override
 +  public long isReady(long tid, Master environment) throws Exception {
 +    return Utils.reserveTable(tableInfo.tableId, tid, true, false, TableOperation.IMPORT);
 +  }
 +
 +  private Map<String,String> getExportedProps(VolumeManager fs) throws Exception {
 +
 +    Path path = new Path(tableInfo.exportDir, Constants.EXPORT_FILE);
 +
 +    try {
 +      FileSystem ns = fs.getVolumeByPath(path).getFileSystem();
 +      return TableOperationsImpl.getExportedProps(ns, path);
 +    } catch (IOException ioe) {
 +      throw new ThriftTableOperationException(tableInfo.tableId, tableInfo.tableName, TableOperation.IMPORT, TableOperationExceptionType.OTHER,
 +          "Error reading table props from " + path + " " + ioe.getMessage());
 +    }
 +  }
 +
 +  @Override
 +  public Repo<Master> call(long tid, Master env) throws Exception {
 +    // reserve the table name in zookeeper or fail
 +
 +    Utils.tableNameLock.lock();
 +    try {
 +      // write tableName & tableId to zookeeper
 +      Instance instance = HdfsZooInstance.getInstance();
 +
 +      Utils.checkTableDoesNotExist(instance, tableInfo.tableName, tableInfo.tableId, TableOperation.CREATE);
 +
 +      String namespace = Tables.qualify(tableInfo.tableName).getFirst();
 +      String namespaceId = Namespaces.getNamespaceId(instance, namespace);
 +      TableManager.getInstance().addTable(tableInfo.tableId, namespaceId, tableInfo.tableName, NodeExistsPolicy.OVERWRITE);
 +
 +      Tables.clearCache(instance);
 +    } finally {
 +      Utils.tableNameLock.unlock();
 +    }
 +
 +    for (Entry<String,String> entry : getExportedProps(env.getFileSystem()).entrySet())
 +      if (!TablePropUtil.setTableProperty(tableInfo.tableId, entry.getKey(), entry.getValue())) {
 +        throw new ThriftTableOperationException(tableInfo.tableId, tableInfo.tableName, TableOperation.IMPORT, TableOperationExceptionType.OTHER,
 +            "Invalid table property " + entry.getKey());
 +      }
 +
 +    return new CreateImportDir(tableInfo);
 +  }
 +
 +  @Override
 +  public void undo(long tid, Master env) throws Exception {
 +    Instance instance = HdfsZooInstance.getInstance();
 +    TableManager.getInstance().removeTable(tableInfo.tableId);
 +    Utils.unreserveTable(tableInfo.tableId, tid, true);
 +    Tables.clearCache(instance);
 +  }
 +}
 +
 +class ImportSetupPermissions extends MasterRepo {
 +
 +  private static final long serialVersionUID = 1L;
 +
 +  private ImportedTableInfo tableInfo;
 +
 +  public ImportSetupPermissions(ImportedTableInfo ti) {
 +    this.tableInfo = ti;
 +  }
 +
 +  @Override
 +  public long isReady(long tid, Master environment) throws Exception {
 +    return 0;
 +  }
 +
 +  @Override
 +  public Repo<Master> call(long tid, Master env) throws Exception {
 +    // give all table permissions to the creator
 +    SecurityOperation security = AuditedSecurityOperation.getInstance();
 +    for (TablePermission permission : TablePermission.values()) {
 +      try {
 +        security.grantTablePermission(SystemCredentials.get().toThrift(env.getInstance()), tableInfo.user, tableInfo.tableId, permission, tableInfo.namespaceId);
 +      } catch (ThriftSecurityException e) {
 +        Logger.getLogger(ImportSetupPermissions.class).error(e.getMessage(), e);
 +        throw e;
 +      }
 +    }
 +
 +    // setup permissions in zookeeper before table info in zookeeper
 +    // this way concurrent users will not get a spurious permission denied
 +    // error
 +    return new ImportPopulateZookeeper(tableInfo);
 +  }
 +
 +  @Override
 +  public void undo(long tid, Master env) throws Exception {
 +    AuditedSecurityOperation.getInstance().deleteTable(SystemCredentials.get().toThrift(env.getInstance()), tableInfo.tableId, tableInfo.namespaceId);
 +  }
 +}
 +
 +public class ImportTable extends MasterRepo {
 +  private static final long serialVersionUID = 1L;
 +
 +  private ImportedTableInfo tableInfo;
 +
 +  public ImportTable(String user, String tableName, String exportDir, String namespaceId) {
 +    tableInfo = new ImportedTableInfo();
 +    tableInfo.tableName = tableName;
 +    tableInfo.user = user;
 +    tableInfo.exportDir = exportDir;
 +    tableInfo.namespaceId = namespaceId;
 +  }
 +
 +  @Override
 +  public long isReady(long tid, Master environment) throws Exception {
 +    return Utils.reserveHdfsDirectory(new Path(tableInfo.exportDir).toString(), tid)
 +        + Utils.reserveNamespace(tableInfo.namespaceId, tid, false, true, TableOperation.IMPORT);
 +  }
 +
 +  @Override
 +  public Repo<Master> call(long tid, Master env) throws Exception {
 +    checkVersions(env);
 +
 +    // first step is to reserve a table id.. if the machine fails during this step
 +    // it is ok to retry... the only side effect is that a table id may not be used
 +    // or skipped
 +
 +    // assuming only the master process is creating tables
 +
 +    Utils.idLock.lock();
 +    try {
 +      Instance instance = HdfsZooInstance.getInstance();
 +      tableInfo.tableId = Utils.getNextTableId(tableInfo.tableName, instance);
 +      return new ImportSetupPermissions(tableInfo);
 +    } finally {
 +      Utils.idLock.unlock();
 +    }
 +  }
 +
 +  public void checkVersions(Master env) throws ThriftTableOperationException {
 +    Path path = new Path(tableInfo.exportDir, Constants.EXPORT_FILE);
 +
 +    ZipInputStream zis = null;
 +
 +    try {
 +      zis = new ZipInputStream(env.getFileSystem().open(path));
 +
 +      Integer exportVersion = null;
 +      Integer dataVersion = null;
 +
 +      ZipEntry zipEntry;
 +      while ((zipEntry = zis.getNextEntry()) != null) {
 +        if (zipEntry.getName().equals(Constants.EXPORT_INFO_FILE)) {
-           BufferedReader in = new BufferedReader(new InputStreamReader(zis, Constants.UTF8));
++          BufferedReader in = new BufferedReader(new InputStreamReader(zis, UTF_8));
 +          String line = null;
 +          while ((line = in.readLine()) != null) {
 +            String sa[] = line.split(":", 2);
 +            if (sa[0].equals(ExportTable.EXPORT_VERSION_PROP)) {
 +              exportVersion = Integer.parseInt(sa[1]);
 +            } else if (sa[0].equals(ExportTable.DATA_VERSION_PROP)) {
 +              dataVersion = Integer.parseInt(sa[1]);
 +            }
 +          }
 +
 +          break;
 +        }
 +      }
 +
 +      zis.close();
 +      zis = null;
 +
 +      if (exportVersion == null || exportVersion > ExportTable.VERSION)
 +        throw new ThriftTableOperationException(null, tableInfo.tableName, TableOperation.IMPORT, TableOperationExceptionType.OTHER,
 +            "Incompatible export version " + exportVersion);
 +
 +      if (dataVersion == null || dataVersion > ServerConstants.DATA_VERSION)
 +        throw new ThriftTableOperationException(null, tableInfo.tableName, TableOperation.IMPORT, TableOperationExceptionType.OTHER,
 +            "Incompatible data version " + exportVersion);
 +
 +    } catch (IOException ioe) {
 +      log.warn(ioe.getMessage(), ioe);
 +      throw new ThriftTableOperationException(null, tableInfo.tableName, TableOperation.IMPORT, TableOperationExceptionType.OTHER,
 +          "Failed to read export metadata " + ioe.getMessage());
 +    } finally {
 +      if (zis != null)
 +        try {
 +          zis.close();
 +        } catch (IOException ioe) {
 +          log.warn(ioe.getMessage(), ioe);
 +        }
 +    }
 +  }
 +
 +  @Override
 +  public void undo(long tid, Master env) throws Exception {
 +    Utils.unreserveHdfsDirectory(new Path(tableInfo.exportDir).toString(), tid);
 +    Utils.unreserveNamespace(tableInfo.namespaceId, tid, false);
 +  }
 +}


Mime
View raw message