accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [28/48] Merge branch '1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT
Date Tue, 04 Feb 2014 17:54:57 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriter.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriter.java
index 70ba661,0000000..f950077
mode 100644,000000..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriter.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriter.java
@@@ -1,86 -1,0 +1,87 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.zookeeper;
 +
 +import java.lang.reflect.InvocationHandler;
 +import java.lang.reflect.InvocationTargetException;
 +import java.lang.reflect.Method;
 +import java.lang.reflect.Proxy;
 +
++import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.fate.util.UtilWaitThread;
 +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 +import org.apache.accumulo.server.conf.ServerConfiguration;
 +import org.apache.log4j.Logger;
 +import org.apache.zookeeper.KeeperException;
 +
 +public class ZooReaderWriter extends org.apache.accumulo.fate.zookeeper.ZooReaderWriter {
 +  private static final String SCHEME = "digest";
 +  private static final String USER = "accumulo";
 +  private static ZooReaderWriter instance = null;
 +  private static IZooReaderWriter retryingInstance = null;
 +  
 +  public ZooReaderWriter(String string, int timeInMillis, String secret) {
-     super(string, timeInMillis, SCHEME, (USER + ":" + secret).getBytes());
++    super(string, timeInMillis, SCHEME, (USER + ":" + secret).getBytes(Constants.UTF8));
 +  }
 +  
 +  public static synchronized ZooReaderWriter getInstance() {
 +    if (instance == null) {
 +      AccumuloConfiguration conf = ServerConfiguration.getSiteConfiguration();
 +      instance = new ZooReaderWriter(conf.get(Property.INSTANCE_ZK_HOST), (int) conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT),
 +          conf.get(Property.INSTANCE_SECRET));
 +    }
 +    return instance;
 +  }
 +  
 +  /**
 +   * get an instance that retries when zookeeper connection errors occur
 +   * 
 +   * @return an instance that retries when Zookeeper connection errors occur.
 +   */
 +  public static synchronized IZooReaderWriter getRetryingInstance() {
 +    
 +    if (retryingInstance == null) {
 +      final IZooReaderWriter inst = getInstance();
 +      
 +      InvocationHandler ih = new InvocationHandler() {
 +        @Override
 +        public Object invoke(Object obj, Method method, Object[] args) throws Throwable {
 +          long retryTime = 250;
 +          while (true) {
 +            try {
 +              return method.invoke(inst, args);
 +            } catch (InvocationTargetException e) {
 +              if (e.getCause() instanceof KeeperException.ConnectionLossException) {
 +                Logger.getLogger(ZooReaderWriter.class).warn("Error connecting to zookeeper, will retry in " + retryTime, e.getCause());
 +                UtilWaitThread.sleep(retryTime);
 +                retryTime = Math.min(5000, retryTime + 250);
 +              } else {
 +                throw e.getCause();
 +              }
 +            }
 +          }
 +        }
 +      };
 +      
 +      retryingInstance = (IZooReaderWriter) Proxy.newProxyInstance(ZooReaderWriter.class.getClassLoader(), new Class[] {IZooReaderWriter.class}, ih);
 +    }
 +    
 +    return retryingInstance;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/extras/src/main/java/org/apache/accumulo/utils/metanalysis/IndexMeta.java
----------------------------------------------------------------------
diff --cc server/extras/src/main/java/org/apache/accumulo/utils/metanalysis/IndexMeta.java
index b296f6d,0000000..448b871
mode 100644,000000..100644
--- a/server/extras/src/main/java/org/apache/accumulo/utils/metanalysis/IndexMeta.java
+++ b/server/extras/src/main/java/org/apache/accumulo/utils/metanalysis/IndexMeta.java
@@@ -1,182 -1,0 +1,183 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.utils.metanalysis;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +
++import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.ClientConfiguration;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.TableExistsException;
 +import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
 +import org.apache.accumulo.core.data.ColumnUpdate;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 +import org.apache.accumulo.core.util.CachedConfiguration;
 +import org.apache.accumulo.server.cli.ClientOpts;
 +import org.apache.accumulo.tserver.logger.LogEvents;
 +import org.apache.accumulo.tserver.logger.LogFileKey;
 +import org.apache.accumulo.tserver.logger.LogFileValue;
 +import org.apache.hadoop.conf.Configured;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.Text;
 +import org.apache.hadoop.io.WritableUtils;
 +import org.apache.hadoop.mapreduce.Job;
 +import org.apache.hadoop.mapreduce.Mapper;
 +import org.apache.hadoop.util.Tool;
 +import org.apache.hadoop.util.ToolRunner;
 +import org.apache.log4j.Logger;
 +
 +import com.beust.jcommander.Parameter;
 +
 +/**
 + * A map reduce job that takes write ahead logs containing mutations for the metadata table and indexes them into Accumulo tables for analysis.
 + * 
 + */
 +
 +public class IndexMeta extends Configured implements Tool {
 +  
 +  public static class IndexMapper extends Mapper<LogFileKey,LogFileValue,Text,Mutation> {
 +    private static final Text CREATE_EVENTS_TABLE = new Text("createEvents");
 +    private static final Text TABLET_EVENTS_TABLE = new Text("tabletEvents");
 +    private Map<Integer,KeyExtent> tabletIds = new HashMap<Integer,KeyExtent>();
 +    private String uuid = null;
 +    
 +    @Override
 +    protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException {
 +      tabletIds = new HashMap<Integer,KeyExtent>();
 +      uuid = null;
 +    }
 +    
 +    @Override
 +    public void map(LogFileKey key, LogFileValue value, Context context) throws IOException, InterruptedException {
 +      if (key.event == LogEvents.OPEN) {
 +        uuid = key.tserverSession;
 +      } else if (key.event == LogEvents.DEFINE_TABLET) {
 +        if (key.tablet.getTableId().toString().equals(MetadataTable.ID)) {
 +          tabletIds.put(key.tid, new KeyExtent(key.tablet));
 +        }
 +      } else if ((key.event == LogEvents.MUTATION || key.event == LogEvents.MANY_MUTATIONS) && tabletIds.containsKey(key.tid)) {
 +        for (Mutation m : value.mutations) {
 +          index(context, m, uuid, tabletIds.get(key.tid));
 +        }
 +      }
 +    }
 +    
 +    void index(Context context, Mutation m, String logFile, KeyExtent metaTablet) throws IOException, InterruptedException {
 +      List<ColumnUpdate> columnsUpdates = m.getUpdates();
 +      
 +      Text prevRow = null;
 +      long timestamp = 0;
 +      
 +      if (m.getRow().length > 0 && m.getRow()[0] == '~') {
 +        return;
 +      }
 +      
 +      for (ColumnUpdate cu : columnsUpdates) {
 +        if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.equals(new Text(cu.getColumnFamily()), new Text(cu.getColumnQualifier())) && !cu.isDeleted()) {
 +          prevRow = new Text(cu.getValue());
 +        }
 +        
 +        timestamp = cu.getTimestamp();
 +      }
 +      
 +      byte[] serMut = WritableUtils.toByteArray(m);
 +      
 +      if (prevRow != null) {
 +        Mutation createEvent = new Mutation(new Text(m.getRow()));
-         createEvent.put(prevRow, new Text(String.format("%020d", timestamp)), new Value(metaTablet.toString().getBytes()));
++        createEvent.put(prevRow, new Text(String.format("%020d", timestamp)), new Value(metaTablet.toString().getBytes(Constants.UTF8)));
 +        context.write(CREATE_EVENTS_TABLE, createEvent);
 +      }
 +      
 +      Mutation tabletEvent = new Mutation(new Text(m.getRow()));
 +      tabletEvent.put(new Text(String.format("%020d", timestamp)), new Text("mut"), new Value(serMut));
-       tabletEvent.put(new Text(String.format("%020d", timestamp)), new Text("mtab"), new Value(metaTablet.toString().getBytes()));
-       tabletEvent.put(new Text(String.format("%020d", timestamp)), new Text("log"), new Value(logFile.getBytes()));
++      tabletEvent.put(new Text(String.format("%020d", timestamp)), new Text("mtab"), new Value(metaTablet.toString().getBytes(Constants.UTF8)));
++      tabletEvent.put(new Text(String.format("%020d", timestamp)), new Text("log"), new Value(logFile.getBytes(Constants.UTF8)));
 +      context.write(TABLET_EVENTS_TABLE, tabletEvent);
 +    }
 +  }
 +  
 +  static class Opts extends ClientOpts {
 +    @Parameter(description = "<logfile> { <logfile> ...}")
 +    List<String> logFiles = new ArrayList<String>();
 +    
 +    public ClientConfiguration getConf() {
 +      return this.getClientConfiguration();
 +    }
 +  }
 +  
 +  @Override
 +  public int run(String[] args) throws Exception {
 +    Opts opts = new Opts();
 +    opts.parseArgs(IndexMeta.class.getName(), args);
 +    
 +    String jobName = this.getClass().getSimpleName() + "_" + System.currentTimeMillis();
 +
 +    @SuppressWarnings("deprecation")
 +    Job job = new Job(getConf(), jobName);
 +    job.setJarByClass(this.getClass());
 +    
 +    List<String> logFiles = Arrays.asList(args).subList(4, args.length);
 +    Path paths[] = new Path[logFiles.size()];
 +    int count = 0;
 +    for (String logFile : logFiles) {
 +      paths[count++] = new Path(logFile);
 +    }
 +    
 +    job.setInputFormatClass(LogFileInputFormat.class);
 +    LogFileInputFormat.setInputPaths(job, paths);
 +    
 +    job.setNumReduceTasks(0);
 +    
 +    job.setOutputFormatClass(AccumuloOutputFormat.class);
 +    AccumuloOutputFormat.setZooKeeperInstance(job, opts.getConf());
 +    AccumuloOutputFormat.setConnectorInfo(job, opts.principal, opts.getToken());
 +    AccumuloOutputFormat.setCreateTables(job, false);
 +    
 +    job.setMapperClass(IndexMapper.class);
 +    
 +    Connector conn = opts.getConnector();
 +    
 +    try {
 +      conn.tableOperations().create("createEvents");
 +    } catch (TableExistsException tee) {
 +      Logger.getLogger(IndexMeta.class).warn("Table createEvents exists");
 +    }
 +    
 +    try {
 +      conn.tableOperations().create("tabletEvents");
 +    } catch (TableExistsException tee) {
 +      Logger.getLogger(IndexMeta.class).warn("Table tabletEvents exists");
 +    }
 +    
 +    job.waitForCompletion(true);
 +    return job.isSuccessful() ? 0 : 1;
 +  }
 +  
 +  public static void main(String[] args) throws Exception {
 +    int res = ToolRunner.run(CachedConfiguration.getInstance(), new IndexMeta(), args);
 +    System.exit(res);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
----------------------------------------------------------------------
diff --cc server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
index 390b33f,0000000..2479d63
mode 100644,000000..100644
--- a/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
@@@ -1,179 -1,0 +1,179 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.master.recovery;
 +
 +import java.io.FileNotFoundException;
 +import java.io.IOException;
 +import java.util.Collection;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.ScheduledExecutorService;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.util.NamingThreadFactory;
 +import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.master.Master;
 +import org.apache.accumulo.server.fs.VolumeManager.FileType;
 +import org.apache.accumulo.server.master.recovery.HadoopLogCloser;
 +import org.apache.accumulo.server.master.recovery.LogCloser;
 +import org.apache.accumulo.server.master.recovery.RecoveryPath;
 +import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 +import org.apache.accumulo.server.zookeeper.ZooCache;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.log4j.Logger;
 +import org.apache.zookeeper.KeeperException;
 +
 +public class RecoveryManager {
 +
 +  private static Logger log = Logger.getLogger(RecoveryManager.class);
 +
 +  private Map<String,Long> recoveryDelay = new HashMap<String,Long>();
 +  private Set<String> closeTasksQueued = new HashSet<String>();
 +  private Set<String> sortsQueued = new HashSet<String>();
 +  private ScheduledExecutorService executor;
 +  private Master master;
 +  private ZooCache zooCache;
 +
 +  public RecoveryManager(Master master) {
 +    this.master = master;
 +    executor = Executors.newScheduledThreadPool(4, new NamingThreadFactory("Walog sort starter "));
 +    zooCache = new ZooCache();
 +    try {
 +      List<String> workIDs = new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY).getWorkQueued();
 +      sortsQueued.addAll(workIDs);
 +    } catch (Exception e) {
 +      log.warn(e, e);
 +    }
 +  }
 +
 +  private class LogSortTask implements Runnable {
 +    private String source;
 +    private String destination;
 +    private String sortId;
 +    private LogCloser closer;
 +
 +    public LogSortTask(LogCloser closer, String source, String destination, String sortId) {
 +      this.closer = closer;
 +      this.source = source;
 +      this.destination = destination;
 +      this.sortId = sortId;
 +    }
 +
 +    @Override
 +    public void run() {
 +      boolean rescheduled = false;
 +      try {
 +
 +        long time = closer.close(master.getConfiguration().getConfiguration(), master.getFileSystem(), new Path(source));
 +
 +        if (time > 0) {
 +          executor.schedule(this, time, TimeUnit.MILLISECONDS);
 +          rescheduled = true;
 +        } else {
 +          initiateSort(sortId, source, destination);
 +        }
 +      } catch (FileNotFoundException e) {
 +        log.debug("Unable to initate log sort for " + source + ": " + e);
 +      } catch (Exception e) {
 +        log.warn("Failed to initiate log sort " + source, e);
 +      } finally {
 +        if (!rescheduled) {
 +          synchronized (RecoveryManager.this) {
 +            closeTasksQueued.remove(sortId);
 +          }
 +        }
 +      }
 +    }
 +
 +  }
 +
 +  private void initiateSort(String sortId, String source, final String destination) throws KeeperException, InterruptedException, IOException {
 +    String work = source + "|" + destination;
-     new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY).addWork(sortId, work.getBytes());
++    new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY).addWork(sortId, work.getBytes(Constants.UTF8));
 +
 +    synchronized (this) {
 +      sortsQueued.add(sortId);
 +    }
 +
 +    final String path = ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + sortId;
 +    log.info("Created zookeeper entry " + path + " with data " + work);
 +  }
 +
 +  public boolean recoverLogs(KeyExtent extent, Collection<Collection<String>> walogs) throws IOException {
 +    boolean recoveryNeeded = false;
 +
 +    for (Collection<String> logs : walogs) {
 +      for (String walog : logs) {
 +
 +        String parts[] = walog.split("/");
 +        String sortId = parts[parts.length - 1];
 +        String filename = master.getFileSystem().getFullPath(FileType.WAL, walog).toString();
 +        String dest = RecoveryPath.getRecoveryPath(master.getFileSystem(), new Path(filename)).toString();
 +        log.debug("Recovering " + filename + " to " + dest);
 +
 +        boolean sortQueued;
 +        synchronized (this) {
 +          sortQueued = sortsQueued.contains(sortId);
 +        }
 +
 +        if (sortQueued && zooCache.get(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + sortId) == null) {
 +          synchronized (this) {
 +            sortsQueued.remove(sortId);
 +          }
 +        }
 +
 +        if (master.getFileSystem().exists(new Path(dest, "finished"))) {
 +          synchronized (this) {
 +            closeTasksQueued.remove(sortId);
 +            recoveryDelay.remove(sortId);
 +            sortsQueued.remove(sortId);
 +          }
 +          continue;
 +        }
 +
 +        recoveryNeeded = true;
 +        synchronized (this) {
 +          if (!closeTasksQueued.contains(sortId) && !sortsQueued.contains(sortId)) {
 +            AccumuloConfiguration aconf = master.getConfiguration().getConfiguration();
 +            LogCloser closer = aconf.instantiateClassProperty(Property.MASTER_WALOG_CLOSER_IMPLEMETATION, LogCloser.class, new HadoopLogCloser());
 +            Long delay = recoveryDelay.get(sortId);
 +            if (delay == null) {
 +              delay = master.getSystemConfiguration().getTimeInMillis(Property.MASTER_RECOVERY_DELAY);
 +            } else {
 +              delay = Math.min(2 * delay, 1000 * 60 * 5l);
 +            }
 +
 +            log.info("Starting recovery of " + filename + " (in : " + (delay / 1000) + "s), tablet " + extent + " holds a reference");
 +
 +            executor.schedule(new LogSortTask(closer, filename, dest, sortId), delay, TimeUnit.MILLISECONDS);
 +            closeTasksQueued.add(sortId);
 +            recoveryDelay.put(sortId, delay);
 +          }
 +        }
 +      }
 +    }
 +    return recoveryNeeded;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java
----------------------------------------------------------------------
diff --cc server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java
index 6b692d8,0000000..4737b6e
mode 100644,000000..100644
--- a/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java
@@@ -1,260 -1,0 +1,260 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.master.state;
 +
 +import java.io.IOException;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.RootTable;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.server.cli.ClientOpts;
 +import org.apache.accumulo.server.master.state.CurrentState;
 +import org.apache.accumulo.server.master.state.MergeInfo;
 +import org.apache.accumulo.server.master.state.MergeState;
 +import org.apache.accumulo.server.master.state.MetaDataTableScanner;
 +import org.apache.accumulo.server.master.state.TabletLocationState;
 +import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException;
 +import org.apache.accumulo.server.master.state.TabletState;
 +import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 +import org.apache.hadoop.io.DataInputBuffer;
 +import org.apache.hadoop.io.Text;
 +import org.apache.log4j.Logger;
 +import org.apache.zookeeper.data.Stat;
 +
 +public class MergeStats {
 +  final static private Logger log = Logger.getLogger(MergeStats.class);
 +  MergeInfo info;
 +  int hosted = 0;
 +  int unassigned = 0;
 +  int chopped = 0;
 +  int needsToBeChopped = 0;
 +  int total = 0;
 +  boolean lowerSplit = false;
 +  boolean upperSplit = false;
 +  
 +  public MergeStats(MergeInfo info) {
 +    this.info = info;
 +    if (info.getState().equals(MergeState.NONE))
 +      return;
 +    if (info.getExtent().getEndRow() == null)
 +      upperSplit = true;
 +    if (info.getExtent().getPrevEndRow() == null)
 +      lowerSplit = true;
 +  }
 +  
 +  public MergeInfo getMergeInfo() {
 +    return info;
 +  }
 +  
 +  public void update(KeyExtent ke, TabletState state, boolean chopped, boolean hasWALs) {
 +    if (info.getState().equals(MergeState.NONE))
 +      return;
 +    if (!upperSplit && info.getExtent().getEndRow().equals(ke.getPrevEndRow())) {
 +      log.info("Upper split found");
 +      upperSplit = true;
 +    }
 +    if (!lowerSplit && info.getExtent().getPrevEndRow().equals(ke.getEndRow())) {
 +      log.info("Lower split found");
 +      lowerSplit = true;
 +    }
 +    if (!info.overlaps(ke))
 +      return;
 +    if (info.needsToBeChopped(ke)) {
 +      this.needsToBeChopped++;
 +      if (chopped) {
 +        if (state.equals(TabletState.HOSTED)) {
 +          this.chopped++;
 +        } else if (!hasWALs) {
 +          this.chopped++;
 +        }
 +      }
 +    }
 +    this.total++;
 +    if (state.equals(TabletState.HOSTED))
 +      this.hosted++;
 +    if (state.equals(TabletState.UNASSIGNED))
 +      this.unassigned++;
 +  }
 +  
 +  public MergeState nextMergeState(Connector connector, CurrentState master) throws Exception {
 +    MergeState state = info.getState();
 +    if (state == MergeState.NONE)
 +      return state;
 +    if (total == 0) {
 +      log.trace("failed to see any tablets for this range, ignoring " + info.getExtent());
 +      return state;
 +    }
 +    log.info("Computing next merge state for " + info.getExtent() + " which is presently " + state + " isDelete : " + info.isDelete());
 +    if (state == MergeState.STARTED) {
 +      state = MergeState.SPLITTING;
 +    }
 +    if (state == MergeState.SPLITTING) {
 +      log.info(hosted + " are hosted, total " + total);
 +      if (!info.isDelete() && total == 1) {
 +        log.info("Merge range is already contained in a single tablet " + info.getExtent());
 +        state = MergeState.COMPLETE;
 +      } else if (hosted == total) {
 +        if (info.isDelete()) {
 +          if (!lowerSplit)
 +            log.info("Waiting for " + info + " lower split to occur " + info.getExtent());
 +          else if (!upperSplit)
 +            log.info("Waiting for " + info + " upper split to occur " + info.getExtent());
 +          else
 +            state = MergeState.WAITING_FOR_CHOPPED;
 +        } else {
 +          state = MergeState.WAITING_FOR_CHOPPED;
 +        }
 +      } else {
 +        log.info("Waiting for " + hosted + " hosted tablets to be " + total + " " + info.getExtent());
 +      }
 +    }
 +    if (state == MergeState.WAITING_FOR_CHOPPED) {
 +      log.info(chopped + " tablets are chopped " + info.getExtent());
 +      if (chopped == needsToBeChopped) {
 +        state = MergeState.WAITING_FOR_OFFLINE;
 +      } else {
 +        log.info("Waiting for " + chopped + " chopped tablets to be " + needsToBeChopped + " " + info.getExtent());
 +      }
 +    }
 +    if (state == MergeState.WAITING_FOR_OFFLINE) {
 +      if (chopped != needsToBeChopped) {
 +        log.warn("Unexpected state: chopped tablets should be " + needsToBeChopped + " was " + chopped + " merge " + info.getExtent());
 +        // Perhaps a split occurred after we chopped, but before we went offline: start over
 +        state = MergeState.WAITING_FOR_CHOPPED;
 +      } else {
 +        log.info(chopped + " tablets are chopped, " + unassigned + " are offline " + info.getExtent());
 +        if (unassigned == total && chopped == needsToBeChopped) {
 +          if (verifyMergeConsistency(connector, master))
 +            state = MergeState.MERGING;
 +          else
 +            log.info("Merge consistency check failed " + info.getExtent());
 +        } else {
 +          log.info("Waiting for " + unassigned + " unassigned tablets to be " + total + " " + info.getExtent());
 +        }
 +      }
 +    }
 +    if (state == MergeState.MERGING) {
 +      if (hosted != 0) {
 +        // Shouldn't happen
 +        log.error("Unexpected state: hosted tablets should be zero " + hosted + " merge " + info.getExtent());
 +        state = MergeState.WAITING_FOR_OFFLINE;
 +      }
 +      if (unassigned != total) {
 +        // Shouldn't happen
 +        log.error("Unexpected state: unassigned tablets should be " + total + " was " + unassigned + " merge " + info.getExtent());
 +        state = MergeState.WAITING_FOR_CHOPPED;
 +      }
 +      log.info(unassigned + " tablets are unassigned " + info.getExtent());
 +    }
 +    return state;
 +  }
 +  
 +  private boolean verifyMergeConsistency(Connector connector, CurrentState master) throws TableNotFoundException, IOException {
 +    MergeStats verify = new MergeStats(info);
 +    KeyExtent extent = info.getExtent();
 +    Scanner scanner = connector.createScanner(extent.isMeta() ? RootTable.NAME : MetadataTable.NAME, Authorizations.EMPTY);
 +    MetaDataTableScanner.configureScanner(scanner, master);
 +    Text start = extent.getPrevEndRow();
 +    if (start == null) {
 +      start = new Text();
 +    }
 +    Text tableId = extent.getTableId();
 +    Text first = KeyExtent.getMetadataEntry(tableId, start);
 +    Range range = new Range(first, false, null, true);
 +    scanner.setRange(range);
 +    KeyExtent prevExtent = null;
 +    
 +    log.debug("Scanning range " + range);
 +    for (Entry<Key,Value> entry : scanner) {
 +      TabletLocationState tls;
 +      try {
 +        tls = MetaDataTableScanner.createTabletLocationState(entry.getKey(), entry.getValue());
 +      } catch (BadLocationStateException e) {
 +        log.error(e, e);
 +        return false;
 +      }
 +      log.debug("consistency check: " + tls + " walogs " + tls.walogs.size());
 +      if (!tls.extent.getTableId().equals(tableId)) {
 +        break;
 +      }
 +      
 +      if (!tls.walogs.isEmpty() && verify.getMergeInfo().needsToBeChopped(tls.extent)) {
 +        log.debug("failing consistency: needs to be chopped" + tls.extent);
 +        return false;
 +      }
 +      
 +      if (prevExtent == null) {
 +        // this is the first tablet observed, it must be offline and its prev row must be less than the start of the merge range
 +        if (tls.extent.getPrevEndRow() != null && tls.extent.getPrevEndRow().compareTo(start) > 0) {
 +          log.debug("failing consistency: prev row is too high " + start);
 +          return false;
 +        }
 +        
 +        if (tls.getState(master.onlineTabletServers()) != TabletState.UNASSIGNED) {
 +          log.debug("failing consistency: assigned or hosted " + tls);
 +          return false;
 +        }
 +        
 +      } else if (!tls.extent.isPreviousExtent(prevExtent)) {
 +        log.debug("hole in " + MetadataTable.NAME);
 +        return false;
 +      }
 +      
 +      prevExtent = tls.extent;
 +      
 +      verify.update(tls.extent, tls.getState(master.onlineTabletServers()), tls.chopped, !tls.walogs.isEmpty());
 +      // stop when we've seen the tablet just beyond our range
 +      if (tls.extent.getPrevEndRow() != null && extent.getEndRow() != null && tls.extent.getPrevEndRow().compareTo(extent.getEndRow()) > 0) {
 +        break;
 +      }
 +    }
 +    log.debug("chopped " + chopped + " v.chopped " + verify.chopped + " unassigned " + unassigned + " v.unassigned " + verify.unassigned + " verify.total "
 +        + verify.total);
 +    return chopped == verify.chopped && unassigned == verify.unassigned && unassigned == verify.total;
 +  }
 +  
 +  public static void main(String[] args) throws Exception {
 +    ClientOpts opts = new ClientOpts();
 +    opts.parseArgs(MergeStats.class.getName(), args);
 +    
 +    Connector conn = opts.getConnector();
 +    Map<String,String> tableIdMap = conn.tableOperations().tableIdMap();
-     for (String table : tableIdMap.keySet()) {
-       String tableId = tableIdMap.get(table);
-       String path = ZooUtil.getRoot(conn.getInstance().getInstanceID()) + Constants.ZTABLES + "/" + tableId.toString() + "/merge";
++    for (Entry<String,String> entry : tableIdMap.entrySet()) {
++      final String table = entry.getKey(), tableId = entry.getValue();
++      String path = ZooUtil.getRoot(conn.getInstance().getInstanceID()) + Constants.ZTABLES + "/" + tableId + "/merge";
 +      MergeInfo info = new MergeInfo();
 +      if (ZooReaderWriter.getInstance().exists(path)) {
 +        byte[] data = ZooReaderWriter.getInstance().getData(path, new Stat());
 +        DataInputBuffer in = new DataInputBuffer();
 +        in.reset(data, data.length);
 +        info.readFields(in);
 +      }
 +      System.out.println(String.format("%25s  %10s %10s %s", table, info.getState(), info.getOperation(), info.getExtent()));
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/master/src/main/java/org/apache/accumulo/master/state/SetGoalState.java
----------------------------------------------------------------------
diff --cc server/master/src/main/java/org/apache/accumulo/master/state/SetGoalState.java
index eff8baa,0000000..e28dcad
mode 100644,000000..100644
--- a/server/master/src/main/java/org/apache/accumulo/master/state/SetGoalState.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/state/SetGoalState.java
@@@ -1,48 -1,0 +1,48 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.master.state;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.master.thrift.MasterGoalState;
 +import org.apache.accumulo.core.security.SecurityUtil;
 +import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 +import org.apache.accumulo.server.Accumulo;
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.accumulo.server.fs.VolumeManagerImpl;
 +import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 +
 +public class SetGoalState {
 +  
 +  /**
 +   * Utility program that will change the goal state for the master from the command line.
 +   */
 +  public static void main(String[] args) throws Exception {
 +    if (args.length != 1 || MasterGoalState.valueOf(args[0]) == null) {
 +      System.err.println("Usage: accumulo " + SetGoalState.class.getName() + " [NORMAL|SAFE_MODE|CLEAN_STOP]");
 +      System.exit(-1);
 +    }
 +    SecurityUtil.serverLogin();
 +
 +    VolumeManager fs = VolumeManagerImpl.get();
 +    Accumulo.waitForZookeeperAndHdfs(fs);
-     ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(HdfsZooInstance.getInstance()) + Constants.ZMASTER_GOAL_STATE, args[0].getBytes(),
++    ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(HdfsZooInstance.getInstance()) + Constants.ZMASTER_GOAL_STATE, args[0].getBytes(Constants.UTF8),
 +        NodeExistsPolicy.OVERWRITE);
 +  }
 +  
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
----------------------------------------------------------------------
diff --cc server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
index 1388c70,0000000..430f14d
mode 100644,000000..100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
@@@ -1,613 -1,0 +1,613 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.master.tableOps;
 +
 +import java.io.BufferedReader;
 +import java.io.BufferedWriter;
 +import java.io.FileNotFoundException;
 +import java.io.IOException;
 +import java.io.InputStreamReader;
 +import java.io.OutputStreamWriter;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +import java.util.concurrent.Callable;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.ThreadPoolExecutor;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.IsolatedScanner;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.impl.ServerClient;
 +import org.apache.accumulo.core.client.impl.Tables;
 +import org.apache.accumulo.core.client.impl.thrift.ClientService;
 +import org.apache.accumulo.core.client.impl.thrift.ClientService.Client;
 +import org.apache.accumulo.core.client.impl.thrift.TableOperation;
 +import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
 +import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.conf.SiteConfiguration;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.file.FileOperations;
 +import org.apache.accumulo.core.master.state.tables.TableState;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.util.Pair;
 +import org.apache.accumulo.core.util.SimpleThreadPool;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.fate.Repo;
 +import org.apache.accumulo.master.Master;
 +import org.apache.accumulo.server.ServerConstants;
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +import org.apache.accumulo.server.conf.ServerConfiguration;
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
 +import org.apache.accumulo.server.master.state.TServerInstance;
 +import org.apache.accumulo.server.security.SystemCredentials;
 +import org.apache.accumulo.server.tablets.UniqueNameAllocator;
 +import org.apache.accumulo.server.util.MetadataTableUtil;
 +import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 +import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
 +import org.apache.accumulo.trace.instrument.TraceExecutorService;
 +import org.apache.accumulo.trace.instrument.Tracer;
 +import org.apache.hadoop.fs.FSDataInputStream;
 +import org.apache.hadoop.fs.FSDataOutputStream;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.MapFile;
 +import org.apache.hadoop.io.Text;
 +import org.apache.log4j.Logger;
 +import org.apache.thrift.TException;
 +
 +/*
 + * Bulk import makes requests of tablet servers, and those requests can take a
 + * long time. Our communications to the tablet server may fail, so we won't know
 + * the status of the request. The master will repeat failed requests so now
 + * there are multiple requests to the tablet server. The tablet server will not
 + * execute the request multiple times, so long as the marker it wrote in the
 + * metadata table stays there. The master needs to know when all requests have
 + * finished so it can remove the markers. Did it start? Did it finish? We can see
 + * that *a* request completed by seeing the flag written into the metadata
 + * table, but we won't know if some other rogue thread is still waiting to start
 + * a thread and repeat the operation.
 + * 
 + * The master can ask the tablet server if it has any requests still running.
 + * Except the tablet server might have some thread about to start a request, but
 + * before it has made any bookkeeping about the request. To prevent problems
 + * like this, an Arbitrator is used. Before starting any new request, the tablet
 + * server checks the Arbitrator to see if the request is still valid.
 + * 
 + */
 +
 +public class BulkImport extends MasterRepo {
 +  public static final String FAILURES_TXT = "failures.txt";
 +  
 +  private static final long serialVersionUID = 1L;
 +  
 +  private static final Logger log = Logger.getLogger(BulkImport.class);
 +  
 +  private String tableId;
 +  private String sourceDir;
 +  private String errorDir;
 +  private boolean setTime;
 +  
 +  public BulkImport(String tableId, String sourceDir, String errorDir, boolean setTime) {
 +    this.tableId = tableId;
 +    this.sourceDir = sourceDir;
 +    this.errorDir = errorDir;
 +    this.setTime = setTime;
 +  }
 +  
 +  @Override
 +  public long isReady(long tid, Master master) throws Exception {
 +    if (!Utils.getReadLock(tableId, tid).tryLock())
 +      return 100;
 +    
 +    Instance instance = HdfsZooInstance.getInstance();
 +    Tables.clearCache(instance);
 +    if (Tables.getTableState(instance, tableId) == TableState.ONLINE) {
 +      long reserve1, reserve2;
 +      reserve1 = reserve2 = Utils.reserveHdfsDirectory(sourceDir, tid);
 +      if (reserve1 == 0)
 +        reserve2 = Utils.reserveHdfsDirectory(errorDir, tid);
 +      return reserve2;
 +    } else {
 +      throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.OFFLINE, null);
 +    }
 +  }
 +  
 +  @Override
 +  //TODO Remove deprecation warning suppression when Hadoop1 support is dropped
 +  @SuppressWarnings("deprecation")
 +  public Repo<Master> call(long tid, Master master) throws Exception {
 +    log.debug(" tid " + tid + " sourceDir " + sourceDir);
 +    
 +    Utils.getReadLock(tableId, tid).lock();
 +    
 +    // check that the error directory exists and is empty
 +    VolumeManager fs = master.getFileSystem();
 +    
 +    Path errorPath = new Path(errorDir);
 +    FileStatus errorStatus = null;
 +    try {
 +      errorStatus = fs.getFileStatus(errorPath);
 +    } catch (FileNotFoundException ex) {
 +      // ignored
 +    }
 +    if (errorStatus == null)
 +      throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY, errorDir
 +          + " does not exist");
 +    if (!errorStatus.isDir())
 +      throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY, errorDir
 +          + " is not a directory");
 +    if (fs.listStatus(errorPath).length != 0)
 +      throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY, errorDir
 +          + " is not empty");
 +    
 +    ZooArbitrator.start(Constants.BULK_ARBITRATOR_TYPE, tid);
 +    
 +    // move the files into the directory
 +    try {
 +      String bulkDir = prepareBulkImport(fs, sourceDir, tableId);
 +      log.debug(" tid " + tid + " bulkDir " + bulkDir);
 +      return new LoadFiles(tableId, sourceDir, bulkDir, errorDir, setTime);
 +    } catch (IOException ex) {
 +      log.error("error preparing the bulk import directory", ex);
 +      throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_INPUT_DIRECTORY, sourceDir + ": "
 +          + ex);
 +    }
 +  }
 +  
 +  private Path createNewBulkDir(VolumeManager fs, String tableId) throws IOException {
 +    
 +    String tableDir = fs.matchingFileSystem(new Path(sourceDir), ServerConstants.getTablesDirs()).toString();
 +    
 +    if (tableDir == null)
 +      throw new IllegalStateException(sourceDir + " is not in a known namespace");
 +    Path directory = new Path(tableDir + "/" + tableId);
 +    fs.mkdirs(directory);
 +    
 +    // only one should be able to create the lock file
 +    // the purpose of the lock file is to avoid a race
 +    // condition between the call to fs.exists() and
 +    // fs.mkdirs()... if only hadoop had a mkdir() function
 +    // that failed when the dir existed
 +    
 +    UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
 +    
 +    while (true) {
 +      Path newBulkDir = new Path(directory, Constants.BULK_PREFIX + namer.getNextName());
 +      if (fs.exists(newBulkDir)) // sanity check
 +        throw new IllegalStateException("Dir exist when it should not " + newBulkDir);
 +      if (fs.mkdirs(newBulkDir))
 +        return newBulkDir;
 +      log.warn("Failed to create " + newBulkDir + " for unknown reason");
 +      
 +      UtilWaitThread.sleep(3000);
 +    }
 +  }
 +
 +  //TODO Remove deprecation warning suppression when Hadoop1 support is dropped
 +  @SuppressWarnings("deprecation")
 +  private String prepareBulkImport(VolumeManager fs, String dir, String tableId) throws IOException {
 +    Path bulkDir = createNewBulkDir(fs, tableId);
 +    
 +    MetadataTableUtil.addBulkLoadInProgressFlag("/" + bulkDir.getParent().getName() + "/" + bulkDir.getName());
 +    
 +    Path dirPath = new Path(dir);
 +    FileStatus[] mapFiles = fs.listStatus(dirPath);
 +    
 +    UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
 +    
 +    for (FileStatus fileStatus : mapFiles) {
 +      String sa[] = fileStatus.getPath().getName().split("\\.");
 +      String extension = "";
 +      if (sa.length > 1) {
 +        extension = sa[sa.length - 1];
 +        
 +        if (!FileOperations.getValidExtensions().contains(extension)) {
 +          log.warn(fileStatus.getPath() + " does not have a valid extension, ignoring");
 +          continue;
 +        }
 +      } else {
 +        // assume it is a map file
 +        extension = Constants.MAPFILE_EXTENSION;
 +      }
 +      
 +      if (extension.equals(Constants.MAPFILE_EXTENSION)) {
 +        if (!fileStatus.isDir()) {
 +          log.warn(fileStatus.getPath() + " is not a map file, ignoring");
 +          continue;
 +        }
 +        
 +        if (fileStatus.getPath().getName().equals("_logs")) {
 +          log.info(fileStatus.getPath() + " is probably a log directory from a map/reduce task, skipping");
 +          continue;
 +        }
 +        try {
 +          FileStatus dataStatus = fs.getFileStatus(new Path(fileStatus.getPath(), MapFile.DATA_FILE_NAME));
 +          if (dataStatus.isDir()) {
 +            log.warn(fileStatus.getPath() + " is not a map file, ignoring");
 +            continue;
 +          }
 +        } catch (FileNotFoundException fnfe) {
 +          log.warn(fileStatus.getPath() + " is not a map file, ignoring");
 +          continue;
 +        }
 +      }
 +      
 +      String newName = "I" + namer.getNextName() + "." + extension;
 +      Path newPath = new Path(bulkDir, newName);
 +      try {
 +        fs.rename(fileStatus.getPath(), newPath);
 +        log.debug("Moved " + fileStatus.getPath() + " to " + newPath);
 +      } catch (IOException E1) {
 +        log.error("Could not move: " + fileStatus.getPath().toString() + " " + E1.getMessage());
 +      }
 +    }
 +    return bulkDir.toString();
 +  }
 +  
 +  @Override
 +  public void undo(long tid, Master environment) throws Exception {
 +    // unreserve source/error directories
 +    Utils.unreserveHdfsDirectory(sourceDir, tid);
 +    Utils.unreserveHdfsDirectory(errorDir, tid);
 +    Utils.getReadLock(tableId, tid).unlock();
 +  }
 +}
 +
 +class CleanUpBulkImport extends MasterRepo {
 +  
 +  private static final long serialVersionUID = 1L;
 +  
 +  private static final Logger log = Logger.getLogger(CleanUpBulkImport.class);
 +  
 +  private String tableId;
 +  private String source;
 +  private String bulk;
 +  private String error;
 +  
 +  public CleanUpBulkImport(String tableId, String source, String bulk, String error) {
 +    this.tableId = tableId;
 +    this.source = source;
 +    this.bulk = bulk;
 +    this.error = error;
 +  }
 +  
 +  @Override
 +  public Repo<Master> call(long tid, Master master) throws Exception {
 +    log.debug("removing the bulk processing flag file in " + bulk);
 +    Path bulkDir = new Path(bulk);
 +    MetadataTableUtil.removeBulkLoadInProgressFlag("/" + bulkDir.getParent().getName() + "/" + bulkDir.getName());
 +    MetadataTableUtil.addDeleteEntry(tableId, bulkDir.toString());
 +    log.debug("removing the metadata table markers for loaded files");
 +    Connector conn = master.getConnector();
 +    MetadataTableUtil.removeBulkLoadEntries(conn, tableId, tid);
 +    log.debug("releasing HDFS reservations for " + source + " and " + error);
 +    Utils.unreserveHdfsDirectory(source, tid);
 +    Utils.unreserveHdfsDirectory(error, tid);
 +    Utils.getReadLock(tableId, tid).unlock();
 +    log.debug("completing bulk import transaction " + tid);
 +    ZooArbitrator.cleanup(Constants.BULK_ARBITRATOR_TYPE, tid);
 +    return null;
 +  }
 +}
 +
 +class CompleteBulkImport extends MasterRepo {
 +  
 +  private static final long serialVersionUID = 1L;
 +  
 +  private String tableId;
 +  private String source;
 +  private String bulk;
 +  private String error;
 +  
 +  public CompleteBulkImport(String tableId, String source, String bulk, String error) {
 +    this.tableId = tableId;
 +    this.source = source;
 +    this.bulk = bulk;
 +    this.error = error;
 +  }
 +  
 +  @Override
 +  public Repo<Master> call(long tid, Master master) throws Exception {
 +    ZooArbitrator.stop(Constants.BULK_ARBITRATOR_TYPE, tid);
 +    return new CopyFailed(tableId, source, bulk, error);
 +  }
 +}
 +
 +class CopyFailed extends MasterRepo {
 +  
 +  private static final long serialVersionUID = 1L;
 +  
 +  private String tableId;
 +  private String source;
 +  private String bulk;
 +  private String error;
 +  
 +  public CopyFailed(String tableId, String source, String bulk, String error) {
 +    this.tableId = tableId;
 +    this.source = source;
 +    this.bulk = bulk;
 +    this.error = error;
 +  }
 +  
 +  @Override
 +  public long isReady(long tid, Master master) throws Exception {
 +    Set<TServerInstance> finished = new HashSet<TServerInstance>();
 +    Set<TServerInstance> running = master.onlineTabletServers();
 +    for (TServerInstance server : running) {
 +      try {
 +        TServerConnection client = master.getConnection(server);
 +        if (client != null && !client.isActive(tid))
 +          finished.add(server);
 +      } catch (TException ex) {
 +        log.info("Ignoring error trying to check on tid " + tid + " from server " + server + ": " + ex);
 +      }
 +    }
 +    if (finished.containsAll(running))
 +      return 0;
 +    return 500;
 +  }
 +  
 +  @Override
 +  public Repo<Master> call(long tid, Master master) throws Exception {
 +    // This needs to execute after the arbiter is stopped
 +    
 +    VolumeManager fs = master.getFileSystem();
 +    
 +    if (!fs.exists(new Path(error, BulkImport.FAILURES_TXT)))
 +      return new CleanUpBulkImport(tableId, source, bulk, error);
 +    
 +    HashMap<String,String> failures = new HashMap<String,String>();
 +    HashMap<String,String> loadedFailures = new HashMap<String,String>();
 +    
 +    FSDataInputStream failFile = fs.open(new Path(error, BulkImport.FAILURES_TXT));
-     BufferedReader in = new BufferedReader(new InputStreamReader(failFile));
++    BufferedReader in = new BufferedReader(new InputStreamReader(failFile, Constants.UTF8));
 +    try {
 +      String line = null;
 +      while ((line = in.readLine()) != null) {
 +        Path path = new Path(line);
 +        if (!fs.exists(new Path(error, path.getName())))
 +          failures.put("/" + path.getParent().getName() + "/" + path.getName(), line);
 +      }
 +    } finally {
 +      failFile.close();
 +    }
 +    
 +    /*
 +     * I thought I could move files that have no file references in the table. However its possible a clone references a file. Therefore only move files that
 +     * have no loaded markers.
 +     */
 +    
 +    // determine which failed files were loaded
 +    Connector conn = master.getConnector();
 +    Scanner mscanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY));
 +    mscanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange());
 +    mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME);
 +    
 +    for (Entry<Key,Value> entry : mscanner) {
 +      if (Long.parseLong(entry.getValue().toString()) == tid) {
 +        String loadedFile = entry.getKey().getColumnQualifier().toString();
 +        String absPath = failures.remove(loadedFile);
 +        if (absPath != null) {
 +          loadedFailures.put(loadedFile, absPath);
 +        }
 +      }
 +    }
 +    
 +    // move failed files that were not loaded
 +    for (String failure : failures.values()) {
 +      Path orig = new Path(failure);
 +      Path dest = new Path(error, orig.getName());
 +      fs.rename(orig, dest);
 +      log.debug("tid " + tid + " renamed " + orig + " to " + dest + ": import failed");
 +    }
 +    
 +    if (loadedFailures.size() > 0) {
 +      DistributedWorkQueue bifCopyQueue = new DistributedWorkQueue(Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID()
 +          + Constants.ZBULK_FAILED_COPYQ);
 +      
 +      HashSet<String> workIds = new HashSet<String>();
 +      
 +      for (String failure : loadedFailures.values()) {
 +        Path orig = new Path(failure);
 +        Path dest = new Path(error, orig.getName());
 +        
 +        if (fs.exists(dest))
 +          continue;
 +        
-         bifCopyQueue.addWork(orig.getName(), (failure + "," + dest).getBytes());
++        bifCopyQueue.addWork(orig.getName(), (failure + "," + dest).getBytes(Constants.UTF8));
 +        workIds.add(orig.getName());
 +        log.debug("tid " + tid + " added to copyq: " + orig + " to " + dest + ": failed");
 +      }
 +      
 +      bifCopyQueue.waitUntilDone(workIds);
 +    }
 +    
 +    fs.deleteRecursively(new Path(error, BulkImport.FAILURES_TXT));
 +    return new CleanUpBulkImport(tableId, source, bulk, error);
 +  }
 +  
 +}
 +
 +class LoadFiles extends MasterRepo {
 +  
 +  private static final long serialVersionUID = 1L;
 +  
 +  private static ExecutorService threadPool = null;
 +  private static final Logger log = Logger.getLogger(BulkImport.class);
 +  
 +  private String tableId;
 +  private String source;
 +  private String bulk;
 +  private String errorDir;
 +  private boolean setTime;
 +  
 +  public LoadFiles(String tableId, String source, String bulk, String errorDir, boolean setTime) {
 +    this.tableId = tableId;
 +    this.source = source;
 +    this.bulk = bulk;
 +    this.errorDir = errorDir;
 +    this.setTime = setTime;
 +  }
 +  
 +  @Override
 +  public long isReady(long tid, Master master) throws Exception {
 +    if (master.onlineTabletServers().size() == 0)
 +      return 500;
 +    return 0;
 +  }
 +  
 +  synchronized void initializeThreadPool(Master master) {
 +    if (threadPool == null) {
 +      int threadPoolSize = master.getSystemConfiguration().getCount(Property.MASTER_BULK_THREADPOOL_SIZE);
 +      ThreadPoolExecutor pool = new SimpleThreadPool(threadPoolSize, "bulk import");
 +      pool.allowCoreThreadTimeOut(true);
 +      threadPool = new TraceExecutorService(pool);
 +    }
 +  }
 +  
 +  @Override
 +  public Repo<Master> call(final long tid, final Master master) throws Exception {
 +    initializeThreadPool(master);
 +    final SiteConfiguration conf = ServerConfiguration.getSiteConfiguration();
 +    VolumeManager fs = master.getFileSystem();
 +    List<FileStatus> files = new ArrayList<FileStatus>();
 +    for (FileStatus entry : fs.listStatus(new Path(bulk))) {
 +      files.add(entry);
 +    }
 +    log.debug("tid " + tid + " importing " + files.size() + " files");
 +    
 +    Path writable = new Path(this.errorDir, ".iswritable");
 +    if (!fs.createNewFile(writable)) {
 +      // Maybe this is a re-try... clear the flag and try again
 +      fs.delete(writable);
 +      if (!fs.createNewFile(writable))
 +        throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY,
 +            "Unable to write to " + this.errorDir);
 +    }
 +    fs.delete(writable);
 +    
 +    final Set<String> filesToLoad = Collections.synchronizedSet(new HashSet<String>());
 +    for (FileStatus f : files)
 +      filesToLoad.add(f.getPath().toString());
 +    
 +    final int RETRIES = Math.max(1, conf.getCount(Property.MASTER_BULK_RETRIES));
 +    for (int attempt = 0; attempt < RETRIES && filesToLoad.size() > 0; attempt++) {
 +      List<Future<List<String>>> results = new ArrayList<Future<List<String>>>();
 +      
 +      if (master.onlineTabletServers().size() == 0)
 +        log.warn("There are no tablet server to process bulk import, waiting (tid = " + tid + ")");
 +      
 +      while (master.onlineTabletServers().size() == 0) {
 +        UtilWaitThread.sleep(500);
 +      }
 +      
 +      // Use the threadpool to assign files one-at-a-time to the server
 +      final List<String> loaded = Collections.synchronizedList(new ArrayList<String>());
 +      for (final String file : filesToLoad) {
 +        results.add(threadPool.submit(new Callable<List<String>>() {
 +          @Override
 +          public List<String> call() {
 +            List<String> failures = new ArrayList<String>();
 +            ClientService.Client client = null;
 +            String server = null;
 +            try {
 +              // get a connection to a random tablet server, do not prefer cached connections because
 +              // this is running on the master and there are lots of connections to tablet servers
 +              // serving the metadata tablets
 +              long timeInMillis = master.getConfiguration().getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT);
 +              Pair<String,Client> pair = ServerClient.getConnection(master.getInstance(), false, timeInMillis);
 +              client = pair.getSecond();
 +              server = pair.getFirst();
 +              List<String> attempt = Collections.singletonList(file);
 +              log.debug("Asking " + pair.getFirst() + " to bulk import " + file);
 +              List<String> fail = client.bulkImportFiles(Tracer.traceInfo(), SystemCredentials.get().toThrift(master.getInstance()), tid, tableId, attempt,
 +                  errorDir, setTime);
 +              if (fail.isEmpty()) {
 +                loaded.add(file);
 +              } else {
 +                failures.addAll(fail);
 +              }
 +            } catch (Exception ex) {
 +              log.error("rpc failed server:" + server + ", tid:" + tid + " " + ex);
 +            } finally {
 +              ServerClient.close(client);
 +            }
 +            return failures;
 +          }
 +        }));
 +      }
 +      Set<String> failures = new HashSet<String>();
 +      for (Future<List<String>> f : results)
 +        failures.addAll(f.get());
 +      filesToLoad.removeAll(loaded);
 +      if (filesToLoad.size() > 0) {
 +        log.debug("tid " + tid + " attempt " + (attempt + 1) + " " + sampleList(filesToLoad, 10) + " failed");
 +        UtilWaitThread.sleep(100);
 +      }
 +    }
 +    
 +    FSDataOutputStream failFile = fs.create(new Path(errorDir, BulkImport.FAILURES_TXT), true);
-     BufferedWriter out = new BufferedWriter(new OutputStreamWriter(failFile));
++    BufferedWriter out = new BufferedWriter(new OutputStreamWriter(failFile, Constants.UTF8));
 +    try {
 +      for (String f : filesToLoad) {
 +        out.write(f);
 +        out.write("\n");
 +      }
 +    } finally {
 +      out.close();
 +    }
 +    
 +    // return the next step, which will perform cleanup
 +    return new CompleteBulkImport(tableId, source, bulk, errorDir);
 +  }
 +  
 +  static String sampleList(Collection<?> potentiallyLongList, int max) {
 +    StringBuffer result = new StringBuffer();
 +    result.append("[");
 +    int i = 0;
 +    for (Object obj : potentiallyLongList) {
 +      result.append(obj);
 +      if (i >= max) {
 +        result.append("...");
 +        break;
 +      } else {
 +        result.append(", ");
 +      }
 +      i++;
 +    }
 +    if (i < max)
 +      result.delete(result.length() - 2, result.length());
 +    result.append("]");
 +    return result.toString();
 +  }
 +  
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java
----------------------------------------------------------------------
diff --cc server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java
index d1c3c40,0000000..49227ef
mode 100644,000000..100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java
@@@ -1,106 -1,0 +1,105 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.master.tableOps;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.impl.Tables;
 +import org.apache.accumulo.core.client.impl.thrift.TableOperation;
 +import org.apache.accumulo.fate.Repo;
 +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter.Mutator;
 +import org.apache.accumulo.master.Master;
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 +
 +class FinishCancelCompaction extends MasterRepo {
 +  private static final long serialVersionUID = 1L;
 +  private String tableId;
 +  
 +  public FinishCancelCompaction(String tableId) {
 +    this.tableId = tableId;
 +  }
 +  
 +  @Override
 +  public Repo<Master> call(long tid, Master environment) throws Exception {
 +    Utils.getReadLock(tableId, tid).unlock();
 +    return null;
 +  }
 +  
 +  @Override
 +  public void undo(long tid, Master environment) throws Exception {
 +    
 +  }
 +}
 +
 +/**
 + * 
 + */
 +public class CancelCompactions extends MasterRepo {
 +  
 +  private static final long serialVersionUID = 1L;
 +  private String tableId;
 +  private String namespaceId;
 +  
 +  public CancelCompactions(String tableId) {
 +    this.tableId = tableId;
 +    Instance inst = HdfsZooInstance.getInstance();
 +    this.namespaceId = Tables.getNamespaceId(inst, tableId);
 +  }
 +  
 +  @Override
 +  public long isReady(long tid, Master environment) throws Exception {
 +    return Utils.reserveNamespace(namespaceId, tid, false, true, TableOperation.COMPACT_CANCEL)
 +        + Utils.reserveTable(tableId, tid, false, true, TableOperation.COMPACT_CANCEL);
 +  }
 +  
 +  @Override
 +  public Repo<Master> call(long tid, Master environment) throws Exception {
 +    String zCompactID = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_COMPACT_ID;
 +    String zCancelID = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZTABLES + "/" + tableId
 +        + Constants.ZTABLE_COMPACT_CANCEL_ID;
 +    
 +    IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
 +    
 +    byte[] currentValue = zoo.getData(zCompactID, null);
 +    
-     String cvs = new String(currentValue);
++    String cvs = new String(currentValue, Constants.UTF8);
 +    String[] tokens = cvs.split(",");
-     final long flushID = Long.parseLong(new String(tokens[0]));
++    final long flushID = Long.parseLong(tokens[0]);
 +    
 +    zoo.mutate(zCancelID, null, null, new Mutator() {
 +      @Override
 +      public byte[] mutate(byte[] currentValue) throws Exception {
-         long cid = Long.parseLong(new String(currentValue));
++        long cid = Long.parseLong(new String(currentValue, Constants.UTF8));
 +        
 +        if (cid < flushID)
-           return (flushID + "").getBytes();
++          return Long.toString(flushID).getBytes(Constants.UTF8);
 +        else
-           return (cid + "").getBytes();
-         
++          return Long.toString(cid).getBytes(Constants.UTF8);
 +      }
 +    });
 +    
 +    return new FinishCancelCompaction(tableId);
 +  }
 +  
 +  @Override
 +  public void undo(long tid, Master environment) throws Exception {
 +    Utils.unreserveNamespace(namespaceId, tid, false);
 +    Utils.unreserveTable(tableId, tid, false);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java
----------------------------------------------------------------------
diff --cc server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java
index 6081ff2,0000000..e3b0405
mode 100644,000000..100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java
@@@ -1,400 -1,0 +1,400 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.master.tableOps;
 +
 +import java.io.DataInput;
 +import java.io.DataOutput;
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map.Entry;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.IsolatedScanner;
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.RowIterator;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.impl.Tables;
 +import org.apache.accumulo.core.client.impl.thrift.TableOperation;
 +import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
 +import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.master.state.tables.TableState;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.RootTable;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.util.MapCounter;
 +import org.apache.accumulo.fate.Repo;
 +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter.Mutator;
 +import org.apache.accumulo.master.Master;
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
 +import org.apache.accumulo.server.master.state.TServerInstance;
 +import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 +import org.apache.commons.codec.binary.Hex;
 +import org.apache.hadoop.io.Text;
 +import org.apache.hadoop.io.Writable;
 +import org.apache.hadoop.io.WritableUtils;
 +import org.apache.log4j.Logger;
 +import org.apache.thrift.TException;
 +import org.apache.zookeeper.KeeperException.NoNodeException;
 +
 +class CompactionDriver extends MasterRepo {
 +
 +  private static final long serialVersionUID = 1L;
 +
 +  private long compactId;
 +  private String tableId;
 +  private byte[] startRow;
 +  private byte[] endRow;
 +  private String namespaceId;
 +
 +  public CompactionDriver(long compactId, String tableId, byte[] startRow, byte[] endRow) {
 +
 +    this.compactId = compactId;
 +    this.tableId = tableId;
 +    this.startRow = startRow;
 +    this.endRow = endRow;
 +    Instance inst = HdfsZooInstance.getInstance();
 +    this.namespaceId = Tables.getNamespaceId(inst, tableId);
 +  }
 +
 +  @Override
 +  public long isReady(long tid, Master master) throws Exception {
 +
 +    String zCancelID = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZTABLES + "/" + tableId
 +        + Constants.ZTABLE_COMPACT_CANCEL_ID;
 +
 +    IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
 +
 +    if (Long.parseLong(new String(zoo.getData(zCancelID, null))) >= compactId) {
 +      // compaction was canceled
 +      throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.OTHER, "Compaction canceled");
 +    }
 +
 +    MapCounter<TServerInstance> serversToFlush = new MapCounter<TServerInstance>();
 +    Connector conn = master.getConnector();
 +
 +    Scanner scanner;
 +
 +    if (tableId.equals(MetadataTable.ID)) {
 +      scanner = new IsolatedScanner(conn.createScanner(RootTable.NAME, Authorizations.EMPTY));
 +      scanner.setRange(MetadataSchema.TabletsSection.getRange());
 +    } else {
 +      scanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY));
 +      Range range = new KeyExtent(new Text(tableId), null, startRow == null ? null : new Text(startRow)).toMetadataRange();
 +      scanner.setRange(range);
 +    }
 +
 +    TabletsSection.ServerColumnFamily.COMPACT_COLUMN.fetch(scanner);
 +    TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner);
 +    scanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME);
 +
 +    long t1 = System.currentTimeMillis();
 +    RowIterator ri = new RowIterator(scanner);
 +
 +    int tabletsToWaitFor = 0;
 +    int tabletCount = 0;
 +
 +    while (ri.hasNext()) {
 +      Iterator<Entry<Key,Value>> row = ri.next();
 +      long tabletCompactID = -1;
 +
 +      TServerInstance server = null;
 +
 +      Entry<Key,Value> entry = null;
 +      while (row.hasNext()) {
 +        entry = row.next();
 +        Key key = entry.getKey();
 +
 +        if (TabletsSection.ServerColumnFamily.COMPACT_COLUMN.equals(key.getColumnFamily(), key.getColumnQualifier()))
 +          tabletCompactID = Long.parseLong(entry.getValue().toString());
 +
 +        if (TabletsSection.CurrentLocationColumnFamily.NAME.equals(key.getColumnFamily()))
 +          server = new TServerInstance(entry.getValue(), key.getColumnQualifier());
 +      }
 +
 +      if (tabletCompactID < compactId) {
 +        tabletsToWaitFor++;
 +        if (server != null)
 +          serversToFlush.increment(server, 1);
 +      }
 +
 +      tabletCount++;
 +
 +      Text tabletEndRow = new KeyExtent(entry.getKey().getRow(), (Text) null).getEndRow();
 +      if (tabletEndRow == null || (endRow != null && tabletEndRow.compareTo(new Text(endRow)) >= 0))
 +        break;
 +    }
 +
 +    long scanTime = System.currentTimeMillis() - t1;
 +
 +    Instance instance = master.getInstance();
 +    Tables.clearCache(instance);
 +    if (tabletCount == 0 && !Tables.exists(instance, tableId))
 +      throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.NOTFOUND, null);
 +
 +    if (serversToFlush.size() == 0 && Tables.getTableState(instance, tableId) == TableState.OFFLINE)
 +      throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.OFFLINE, null);
 +
 +    if (tabletsToWaitFor == 0)
 +      return 0;
 +
 +    for (TServerInstance tsi : serversToFlush.keySet()) {
 +      try {
 +        final TServerConnection server = master.getConnection(tsi);
 +        if (server != null)
 +          server.compact(master.getMasterLock(), tableId, startRow, endRow);
 +      } catch (TException ex) {
 +        Logger.getLogger(CompactionDriver.class).error(ex.toString());
 +      }
 +    }
 +
 +    long sleepTime = 500;
 +
 +    if (serversToFlush.size() > 0)
 +      sleepTime = Collections.max(serversToFlush.values()) * sleepTime; // make wait time depend on the server with the most to
 +                                                                        // compact
 +
 +    sleepTime = Math.max(2 * scanTime, sleepTime);
 +
 +    sleepTime = Math.min(sleepTime, 30000);
 +
 +    return sleepTime;
 +  }
 +
 +  @Override
 +  public Repo<Master> call(long tid, Master environment) throws Exception {
 +    CompactRange.removeIterators(tid, tableId);
 +    Utils.getReadLock(tableId, tid).unlock();
 +    Utils.getReadLock(namespaceId, tid).unlock();
 +    return null;
 +  }
 +
 +  @Override
 +  public void undo(long tid, Master environment) throws Exception {
 +
 +  }
 +
 +}
 +
 +public class CompactRange extends MasterRepo {
 +
 +  private static final long serialVersionUID = 1L;
 +  private String tableId;
 +  private byte[] startRow;
 +  private byte[] endRow;
 +  private byte[] iterators;
 +  private String namespaceId;
 +
 +  public static class CompactionIterators implements Writable {
 +    byte[] startRow;
 +    byte[] endRow;
 +    List<IteratorSetting> iterators;
 +
 +    public CompactionIterators(byte[] startRow, byte[] endRow, List<IteratorSetting> iterators) {
 +      this.startRow = startRow;
 +      this.endRow = endRow;
 +      this.iterators = iterators;
 +    }
 +
 +    public CompactionIterators() {
 +      startRow = null;
 +      endRow = null;
 +      iterators = Collections.emptyList();
 +    }
 +
 +    @Override
 +    public void write(DataOutput out) throws IOException {
 +      out.writeBoolean(startRow != null);
 +      if (startRow != null) {
 +        out.writeInt(startRow.length);
 +        out.write(startRow);
 +      }
 +
 +      out.writeBoolean(endRow != null);
 +      if (endRow != null) {
 +        out.writeInt(endRow.length);
 +        out.write(endRow);
 +      }
 +
 +      out.writeInt(iterators.size());
 +      for (IteratorSetting is : iterators) {
 +        is.write(out);
 +      }
 +    }
 +
 +    @Override
 +    public void readFields(DataInput in) throws IOException {
 +      if (in.readBoolean()) {
 +        startRow = new byte[in.readInt()];
 +        in.readFully(startRow);
 +      } else {
 +        startRow = null;
 +      }
 +
 +      if (in.readBoolean()) {
 +        endRow = new byte[in.readInt()];
 +        in.readFully(endRow);
 +      } else {
 +        endRow = null;
 +      }
 +
 +      int num = in.readInt();
 +      iterators = new ArrayList<IteratorSetting>(num);
 +
 +      for (int i = 0; i < num; i++) {
 +        iterators.add(new IteratorSetting(in));
 +      }
 +    }
 +
 +    public Text getEndRow() {
 +      if (endRow == null)
 +        return null;
 +      return new Text(endRow);
 +    }
 +
 +    public Text getStartRow() {
 +      if (startRow == null)
 +        return null;
 +      return new Text(startRow);
 +    }
 +
 +    public List<IteratorSetting> getIterators() {
 +      return iterators;
 +    }
 +  }
 +
 +  public CompactRange(String tableId, byte[] startRow, byte[] endRow, List<IteratorSetting> iterators) throws ThriftTableOperationException {
 +    this.tableId = tableId;
 +    this.startRow = startRow.length == 0 ? null : startRow;
 +    this.endRow = endRow.length == 0 ? null : endRow;
 +    Instance inst = HdfsZooInstance.getInstance();
 +    this.namespaceId = Tables.getNamespaceId(inst, tableId);
 +
 +    if (iterators.size() > 0) {
 +      this.iterators = WritableUtils.toByteArray(new CompactionIterators(this.startRow, this.endRow, iterators));
 +    } else {
 +      iterators = null;
 +    }
 +
 +    if (this.startRow != null && this.endRow != null && new Text(startRow).compareTo(new Text(endRow)) >= 0)
 +      throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.BAD_RANGE,
 +          "start row must be less than end row");
 +  }
 +
 +  @Override
 +  public long isReady(long tid, Master environment) throws Exception {
 +    return Utils.reserveNamespace(namespaceId, tid, false, true, TableOperation.COMPACT)
 +        + Utils.reserveTable(tableId, tid, false, true, TableOperation.COMPACT);
 +  }
 +
 +  @Override
 +  public Repo<Master> call(final long tid, Master environment) throws Exception {
 +    String zTablePath = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_COMPACT_ID;
 +
 +    IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
 +    byte[] cid;
 +    try {
 +      cid = zoo.mutate(zTablePath, null, null, new Mutator() {
 +        @Override
 +        public byte[] mutate(byte[] currentValue) throws Exception {
-           String cvs = new String(currentValue);
++          String cvs = new String(currentValue, Constants.UTF8);
 +          String[] tokens = cvs.split(",");
-           long flushID = Long.parseLong(new String(tokens[0]));
++          long flushID = Long.parseLong(tokens[0]);
 +          flushID++;
 +
 +          String txidString = String.format("%016x", tid);
 +
 +          for (int i = 1; i < tokens.length; i++) {
 +            if (tokens[i].startsWith(txidString))
 +              continue; // skip self
 +
 +            throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.OTHER,
 +                "Another compaction with iterators is running");
 +          }
 +
 +          StringBuilder encodedIterators = new StringBuilder();
 +
 +          if (iterators != null) {
 +            Hex hex = new Hex();
 +            encodedIterators.append(",");
 +            encodedIterators.append(txidString);
 +            encodedIterators.append("=");
-             encodedIterators.append(new String(hex.encode(iterators)));
++            encodedIterators.append(new String(hex.encode(iterators), Constants.UTF8));
 +          }
 +
-           return ("" + flushID + encodedIterators).getBytes();
++          return (Long.toString(flushID) + encodedIterators).getBytes(Constants.UTF8);
 +        }
 +      });
 +
-       return new CompactionDriver(Long.parseLong(new String(cid).split(",")[0]), tableId, startRow, endRow);
++      return new CompactionDriver(Long.parseLong(new String(cid, Constants.UTF8).split(",")[0]), tableId, startRow, endRow);
 +    } catch (NoNodeException nne) {
 +      throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.NOTFOUND, null);
 +    }
 +
 +  }
 +
 +  static void removeIterators(final long txid, String tableId) throws Exception {
 +    String zTablePath = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_COMPACT_ID;
 +
 +    IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
 +
 +    zoo.mutate(zTablePath, null, null, new Mutator() {
 +      @Override
 +      public byte[] mutate(byte[] currentValue) throws Exception {
-         String cvs = new String(currentValue);
++        String cvs = new String(currentValue, Constants.UTF8);
 +        String[] tokens = cvs.split(",");
-         long flushID = Long.parseLong(new String(tokens[0]));
++        long flushID = Long.parseLong(tokens[0]);
 +
 +        String txidString = String.format("%016x", txid);
 +
 +        StringBuilder encodedIterators = new StringBuilder();
 +        for (int i = 1; i < tokens.length; i++) {
 +          if (tokens[i].startsWith(txidString))
 +            continue;
 +          encodedIterators.append(",");
 +          encodedIterators.append(tokens[i]);
 +        }
 +
-         return ("" + flushID + encodedIterators).getBytes();
++        return (Long.toString(flushID) + encodedIterators).getBytes(Constants.UTF8);
 +      }
 +    });
 +
 +  }
 +
 +  @Override
 +  public void undo(long tid, Master environment) throws Exception {
 +    try {
 +      removeIterators(tid, tableId);
 +    } finally {
 +      Utils.unreserveNamespace(namespaceId, tid, false);
 +      Utils.unreserveTable(tableId, tid, false);
 +    }
 +  }
 +
 +}


Mime
View raw message