accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [10/59] [abbrv] ACCUMULO-658 Move master to its own module
Date Sat, 07 Sep 2013 03:28:13 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/88079cc3/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java b/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java
new file mode 100644
index 0000000..9312cce
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.state;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.master.state.CurrentState;
+import org.apache.accumulo.master.state.MergeInfo;
+import org.apache.accumulo.master.state.MergeState;
+import org.apache.accumulo.master.state.MetaDataTableScanner;
+import org.apache.accumulo.master.state.TabletLocationState;
+import org.apache.accumulo.master.state.TabletState;
+import org.apache.accumulo.master.state.TabletLocationState.BadLocationStateException;
+import org.apache.accumulo.server.cli.ClientOpts;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.data.Stat;
+
+public class MergeStats {
+  final static private Logger log = Logger.getLogger(MergeStats.class);
+  MergeInfo info;
+  int hosted = 0;
+  int unassigned = 0;
+  int chopped = 0;
+  int needsToBeChopped = 0;
+  int total = 0;
+  boolean lowerSplit = false;
+  boolean upperSplit = false;
+  
+  public MergeStats(MergeInfo info) {
+    this.info = info;
+    if (info.getState().equals(MergeState.NONE))
+      return;
+    if (info.getExtent().getEndRow() == null)
+      upperSplit = true;
+    if (info.getExtent().getPrevEndRow() == null)
+      lowerSplit = true;
+  }
+  
+  public MergeInfo getMergeInfo() {
+    return info;
+  }
+  
+  public void update(KeyExtent ke, TabletState state, boolean chopped, boolean hasWALs) {
+    if (info.getState().equals(MergeState.NONE))
+      return;
+    if (!upperSplit && info.getExtent().getEndRow().equals(ke.getPrevEndRow())) {
+      log.info("Upper split found");
+      upperSplit = true;
+    }
+    if (!lowerSplit && info.getExtent().getPrevEndRow().equals(ke.getEndRow())) {
+      log.info("Lower split found");
+      lowerSplit = true;
+    }
+    if (!info.overlaps(ke))
+      return;
+    if (info.needsToBeChopped(ke)) {
+      this.needsToBeChopped++;
+      if (chopped) {
+        if (state.equals(TabletState.HOSTED)) {
+          this.chopped++;
+        } else if (!hasWALs) {
+          this.chopped++;
+        }
+      }
+    }
+    this.total++;
+    if (state.equals(TabletState.HOSTED))
+      this.hosted++;
+    if (state.equals(TabletState.UNASSIGNED))
+      this.unassigned++;
+  }
+  
+  public MergeState nextMergeState(Connector connector, CurrentState master) throws Exception {
+    MergeState state = info.getState();
+    if (state == MergeState.NONE)
+      return state;
+    if (total == 0) {
+      log.trace("failed to see any tablets for this range, ignoring " + info.getExtent());
+      return state;
+    }
+    log.info("Computing next merge state for " + info.getExtent() + " which is presently " + state + " isDelete : " + info.isDelete());
+    if (state == MergeState.STARTED) {
+      state = MergeState.SPLITTING;
+    }
+    if (state == MergeState.SPLITTING) {
+      log.info(hosted + " are hosted, total " + total);
+      if (!info.isDelete() && total == 1) {
+        log.info("Merge range is already contained in a single tablet " + info.getExtent());
+        state = MergeState.COMPLETE;
+      } else if (hosted == total) {
+        if (info.isDelete()) {
+          if (!lowerSplit)
+            log.info("Waiting for " + info + " lower split to occur " + info.getExtent());
+          else if (!upperSplit)
+            log.info("Waiting for " + info + " upper split to occur " + info.getExtent());
+          else
+            state = MergeState.WAITING_FOR_CHOPPED;
+        } else {
+          state = MergeState.WAITING_FOR_CHOPPED;
+        }
+      } else {
+        log.info("Waiting for " + hosted + " hosted tablets to be " + total + " " + info.getExtent());
+      }
+    }
+    if (state == MergeState.WAITING_FOR_CHOPPED) {
+      log.info(chopped + " tablets are chopped " + info.getExtent());
+      if (chopped == needsToBeChopped) {
+        state = MergeState.WAITING_FOR_OFFLINE;
+      } else {
+        log.info("Waiting for " + chopped + " chopped tablets to be " + needsToBeChopped + " " + info.getExtent());
+      }
+    }
+    if (state == MergeState.WAITING_FOR_OFFLINE) {
+      if (chopped != needsToBeChopped) {
+        log.warn("Unexpected state: chopped tablets should be " + needsToBeChopped + " was " + chopped + " merge " + info.getExtent());
+        // Perhaps a split occurred after we chopped, but before we went offline: start over
+        state = MergeState.WAITING_FOR_CHOPPED;
+      } else {
+        log.info(chopped + " tablets are chopped, " + unassigned + " are offline " + info.getExtent());
+        if (unassigned == total && chopped == needsToBeChopped) {
+          if (verifyMergeConsistency(connector, master))
+            state = MergeState.MERGING;
+          else
+            log.info("Merge consistency check failed " + info.getExtent());
+        } else {
+          log.info("Waiting for " + unassigned + " unassigned tablets to be " + total + " " + info.getExtent());
+        }
+      }
+    }
+    if (state == MergeState.MERGING) {
+      if (hosted != 0) {
+        // Shouldn't happen
+        log.error("Unexpected state: hosted tablets should be zero " + hosted + " merge " + info.getExtent());
+        state = MergeState.WAITING_FOR_OFFLINE;
+      }
+      if (unassigned != total) {
+        // Shouldn't happen
+        log.error("Unexpected state: unassigned tablets should be " + total + " was " + unassigned + " merge " + info.getExtent());
+        state = MergeState.WAITING_FOR_CHOPPED;
+      }
+      log.info(unassigned + " tablets are unassigned " + info.getExtent());
+    }
+    return state;
+  }
+  
+  private boolean verifyMergeConsistency(Connector connector, CurrentState master) throws TableNotFoundException, IOException {
+    MergeStats verify = new MergeStats(info);
+    KeyExtent extent = info.getExtent();
+    Scanner scanner = connector.createScanner(extent.isMeta() ? RootTable.NAME : MetadataTable.NAME, Authorizations.EMPTY);
+    MetaDataTableScanner.configureScanner(scanner, master);
+    Text start = extent.getPrevEndRow();
+    if (start == null) {
+      start = new Text();
+    }
+    Text tableId = extent.getTableId();
+    Text first = KeyExtent.getMetadataEntry(tableId, start);
+    Range range = new Range(first, false, null, true);
+    scanner.setRange(range);
+    KeyExtent prevExtent = null;
+    
+    log.debug("Scanning range " + range);
+    for (Entry<Key,Value> entry : scanner) {
+      TabletLocationState tls;
+      try {
+        tls = MetaDataTableScanner.createTabletLocationState(entry.getKey(), entry.getValue());
+      } catch (BadLocationStateException e) {
+        log.error(e, e);
+        return false;
+      }
+      log.debug("consistency check: " + tls + " walogs " + tls.walogs.size());
+      if (!tls.extent.getTableId().equals(tableId)) {
+        break;
+      }
+      
+      if (!tls.walogs.isEmpty() && verify.getMergeInfo().needsToBeChopped(tls.extent)) {
+        log.debug("failing consistency: needs to be chopped" + tls.extent);
+        return false;
+      }
+      
+      if (prevExtent == null) {
+        // this is the first tablet observed, it must be offline and its prev row must be less than the start of the merge range
+        if (tls.extent.getPrevEndRow() != null && tls.extent.getPrevEndRow().compareTo(start) > 0) {
+          log.debug("failing consistency: prev row is too high " + start);
+          return false;
+        }
+        
+        if (tls.getState(master.onlineTabletServers()) != TabletState.UNASSIGNED) {
+          log.debug("failing consistency: assigned or hosted " + tls);
+          return false;
+        }
+        
+      } else if (!tls.extent.isPreviousExtent(prevExtent)) {
+        log.debug("hole in " + MetadataTable.NAME);
+        return false;
+      }
+      
+      prevExtent = tls.extent;
+      
+      verify.update(tls.extent, tls.getState(master.onlineTabletServers()), tls.chopped, !tls.walogs.isEmpty());
+      // stop when we've seen the tablet just beyond our range
+      if (tls.extent.getPrevEndRow() != null && extent.getEndRow() != null && tls.extent.getPrevEndRow().compareTo(extent.getEndRow()) > 0) {
+        break;
+      }
+    }
+    log.debug("chopped " + chopped + " v.chopped " + verify.chopped + " unassigned " + unassigned + " v.unassigned " + verify.unassigned + " verify.total "
+        + verify.total);
+    return chopped == verify.chopped && unassigned == verify.unassigned && unassigned == verify.total;
+  }
+  
+  public static void main(String[] args) throws Exception {
+    ClientOpts opts = new ClientOpts();
+    opts.parseArgs(MergeStats.class.getName(), args);
+    
+    Connector conn = opts.getConnector();
+    Map<String,String> tableIdMap = conn.tableOperations().tableIdMap();
+    for (String table : tableIdMap.keySet()) {
+      String tableId = tableIdMap.get(table);
+      String path = ZooUtil.getRoot(conn.getInstance().getInstanceID()) + Constants.ZTABLES + "/" + tableId.toString() + "/merge";
+      MergeInfo info = new MergeInfo();
+      if (ZooReaderWriter.getInstance().exists(path)) {
+        byte[] data = ZooReaderWriter.getInstance().getData(path, new Stat());
+        DataInputBuffer in = new DataInputBuffer();
+        in.reset(data, data.length);
+        info.readFields(in);
+      }
+      System.out.println(String.format("%25s  %10s %10s %s", table, info.state, info.operation, info.extent));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/88079cc3/server/master/src/main/java/org/apache/accumulo/master/state/MetaDataStateStore.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/state/MetaDataStateStore.java b/server/master/src/main/java/org/apache/accumulo/master/state/MetaDataStateStore.java
new file mode 100644
index 0000000..6108e10
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/state/MetaDataStateStore.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.state;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.master.state.CurrentState;
+import org.apache.accumulo.master.state.MetaDataTableScanner;
+import org.apache.accumulo.master.state.TabletLocationState;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.hadoop.io.Text;
+
+public class MetaDataStateStore extends TabletStateStore {
+  // private static final Logger log = Logger.getLogger(MetaDataStateStore.class);
+  
+  private static final int THREADS = 4;
+  private static final int LATENCY = 1000;
+  private static final int MAX_MEMORY = 200 * 1024 * 1024;
+  
+  final protected Instance instance;
+  final protected CurrentState state;
+  final protected Credentials credentials;
+  final private String targetTableName;
+  
+  protected MetaDataStateStore(Instance instance, Credentials credentials, CurrentState state, String targetTableName) {
+    this.instance = instance;
+    this.state = state;
+    this.credentials = credentials;
+    this.targetTableName = targetTableName;
+  }
+  
+  public MetaDataStateStore(Instance instance, Credentials credentials, CurrentState state) {
+    this(instance, credentials, state, MetadataTable.NAME);
+  }
+  
+  protected MetaDataStateStore(String tableName) {
+    this(HdfsZooInstance.getInstance(), SystemCredentials.get(), null, tableName);
+  }
+  
+  public MetaDataStateStore() {
+    this(MetadataTable.NAME);
+  }
+  
+  @Override
+  public Iterator<TabletLocationState> iterator() {
+    return new MetaDataTableScanner(instance, credentials, MetadataSchema.TabletsSection.getRange(), state);
+  }
+  
+  @Override
+  public void setLocations(Collection<Assignment> assignments) throws DistributedStoreException {
+    BatchWriter writer = createBatchWriter();
+    try {
+      for (Assignment assignment : assignments) {
+        Mutation m = new Mutation(assignment.tablet.getMetadataEntry());
+        Text cq = assignment.server.asColumnQualifier();
+        m.put(TabletsSection.CurrentLocationColumnFamily.NAME, cq, assignment.server.asMutationValue());
+        m.putDelete(TabletsSection.FutureLocationColumnFamily.NAME, cq);
+        writer.addMutation(m);
+      }
+    } catch (Exception ex) {
+      throw new DistributedStoreException(ex);
+    } finally {
+      try {
+        writer.close();
+      } catch (MutationsRejectedException e) {
+        throw new DistributedStoreException(e);
+      }
+    }
+  }
+  
+  BatchWriter createBatchWriter() {
+    try {
+      return instance.getConnector(credentials.getPrincipal(), credentials.getToken()).createBatchWriter(targetTableName,
+          new BatchWriterConfig().setMaxMemory(MAX_MEMORY).setMaxLatency(LATENCY, TimeUnit.MILLISECONDS).setMaxWriteThreads(THREADS));
+    } catch (TableNotFoundException e) {
+      // ya, I don't think so
+      throw new RuntimeException(e);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+  
+  @Override
+  public void setFutureLocations(Collection<Assignment> assignments) throws DistributedStoreException {
+    BatchWriter writer = createBatchWriter();
+    try {
+      for (Assignment assignment : assignments) {
+        Mutation m = new Mutation(assignment.tablet.getMetadataEntry());
+        m.put(TabletsSection.FutureLocationColumnFamily.NAME, assignment.server.asColumnQualifier(), assignment.server.asMutationValue());
+        writer.addMutation(m);
+      }
+    } catch (Exception ex) {
+      throw new DistributedStoreException(ex);
+    } finally {
+      try {
+        writer.close();
+      } catch (MutationsRejectedException e) {
+        throw new DistributedStoreException(e);
+      }
+    }
+  }
+  
+  @Override
+  public void unassign(Collection<TabletLocationState> tablets) throws DistributedStoreException {
+    
+    BatchWriter writer = createBatchWriter();
+    try {
+      for (TabletLocationState tls : tablets) {
+        Mutation m = new Mutation(tls.extent.getMetadataEntry());
+        if (tls.current != null) {
+          m.putDelete(TabletsSection.CurrentLocationColumnFamily.NAME, tls.current.asColumnQualifier());
+        }
+        if (tls.future != null) {
+          m.putDelete(TabletsSection.FutureLocationColumnFamily.NAME, tls.future.asColumnQualifier());
+        }
+        writer.addMutation(m);
+      }
+    } catch (Exception ex) {
+      throw new DistributedStoreException(ex);
+    } finally {
+      try {
+        writer.close();
+      } catch (MutationsRejectedException e) {
+        throw new DistributedStoreException(e);
+      }
+    }
+  }
+  
+  @Override
+  public String name() {
+    return "Normal Tablets";
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/88079cc3/server/master/src/main/java/org/apache/accumulo/master/state/RootTabletStateStore.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/state/RootTabletStateStore.java b/server/master/src/main/java/org/apache/accumulo/master/state/RootTabletStateStore.java
new file mode 100644
index 0000000..b7071fe
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/state/RootTabletStateStore.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.state;
+
+import java.util.Iterator;
+
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.master.state.CurrentState;
+import org.apache.accumulo.master.state.MetaDataTableScanner;
+import org.apache.accumulo.master.state.TabletLocationState;
+
+public class RootTabletStateStore extends MetaDataStateStore {
+  
+  public RootTabletStateStore(Instance instance, Credentials credentials, CurrentState state) {
+    super(instance, credentials, state, RootTable.NAME);
+  }
+  
+  public RootTabletStateStore() {
+    super(RootTable.NAME);
+  }
+  
+  @Override
+  public Iterator<TabletLocationState> iterator() {
+    return new MetaDataTableScanner(instance, credentials, MetadataSchema.TabletsSection.getRange(), state, RootTable.NAME);
+  }
+  
+  @Override
+  public String name() {
+    return "Non-Root Metadata Tablets";
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/88079cc3/server/master/src/main/java/org/apache/accumulo/master/state/SetGoalState.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/state/SetGoalState.java b/server/master/src/main/java/org/apache/accumulo/master/state/SetGoalState.java
new file mode 100644
index 0000000..eff8baa
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/state/SetGoalState.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.state;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.master.thrift.MasterGoalState;
+import org.apache.accumulo.core.security.SecurityUtil;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.server.Accumulo;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+
+public class SetGoalState {
+  
+  /**
+   * Utility program that will change the goal state for the master from the command line.
+   */
+  public static void main(String[] args) throws Exception {
+    if (args.length != 1 || MasterGoalState.valueOf(args[0]) == null) {
+      System.err.println("Usage: accumulo " + SetGoalState.class.getName() + " [NORMAL|SAFE_MODE|CLEAN_STOP]");
+      System.exit(-1);
+    }
+    SecurityUtil.serverLogin();
+
+    VolumeManager fs = VolumeManagerImpl.get();
+    Accumulo.waitForZookeeperAndHdfs(fs);
+    ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(HdfsZooInstance.getInstance()) + Constants.ZMASTER_GOAL_STATE, args[0].getBytes(),
+        NodeExistsPolicy.OVERWRITE);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/88079cc3/server/master/src/main/java/org/apache/accumulo/master/state/TableCounts.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/state/TableCounts.java b/server/master/src/main/java/org/apache/accumulo/master/state/TableCounts.java
new file mode 100644
index 0000000..2d1e186
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/state/TableCounts.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.state;
+
+import org.apache.accumulo.master.state.TabletState;
+
+public class TableCounts {
+  int counts[] = new int[TabletState.values().length];
+  
+  public int unassigned() {
+    return counts[TabletState.UNASSIGNED.ordinal()];
+  }
+  
+  public int assigned() {
+    return counts[TabletState.ASSIGNED.ordinal()];
+  }
+  
+  public int assignedToDeadServers() {
+    return counts[TabletState.ASSIGNED_TO_DEAD_SERVER.ordinal()];
+  }
+  
+  public int hosted() {
+    return counts[TabletState.HOSTED.ordinal()];
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/88079cc3/server/master/src/main/java/org/apache/accumulo/master/state/TableStats.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/state/TableStats.java b/server/master/src/main/java/org/apache/accumulo/master/state/TableStats.java
new file mode 100644
index 0000000..12b6ec8
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/state/TableStats.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.state;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.accumulo.master.state.TabletState;
+import org.apache.hadoop.io.Text;
+
+public class TableStats {
+  private Map<Text,TableCounts> last = new HashMap<Text,TableCounts>();
+  private Map<Text,TableCounts> next;
+  private long startScan = 0;
+  private long endScan = 0;
+  
+  public synchronized void begin() {
+    next = new HashMap<Text,TableCounts>();
+    startScan = System.currentTimeMillis();
+  }
+  
+  public synchronized void update(Text tableId, TabletState state) {
+    TableCounts counts = next.get(tableId);
+    if (counts == null) {
+      counts = new TableCounts();
+      next.put(tableId, counts);
+    }
+    counts.counts[state.ordinal()]++;
+  }
+  
+  public synchronized void end() {
+    last = next;
+    next = null;
+    endScan = System.currentTimeMillis();
+  }
+  
+  public synchronized Map<Text,TableCounts> getLast() {
+    return last;
+  }
+  
+  public synchronized TableCounts getLast(Text tableId) {
+    TableCounts result = last.get(tableId);
+    if (result == null)
+      return new TableCounts();
+    return result;
+  }
+  
+  public synchronized long getScanTime() {
+    if (endScan <= startScan)
+      return System.currentTimeMillis() - startScan;
+    return endScan - startScan;
+  }
+  
+  public synchronized long lastScanFinished() {
+    return endScan;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/88079cc3/server/master/src/main/java/org/apache/accumulo/master/state/TabletMigration.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/state/TabletMigration.java b/server/master/src/main/java/org/apache/accumulo/master/state/TabletMigration.java
new file mode 100644
index 0000000..aab7520
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/state/TabletMigration.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.state;
+
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.master.state.TServerInstance;
+
+public class TabletMigration {
+  public KeyExtent tablet;
+  public TServerInstance oldServer;
+  public TServerInstance newServer;
+  
+  public TabletMigration(KeyExtent extent, TServerInstance before, TServerInstance after) {
+    this.tablet = extent;
+    this.oldServer = before;
+    this.newServer = after;
+  }
+  
+  public String toString() {
+    return tablet + ": " + oldServer + " -> " + newServer;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/88079cc3/server/master/src/main/java/org/apache/accumulo/master/state/TabletStateStore.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/state/TabletStateStore.java b/server/master/src/main/java/org/apache/accumulo/master/state/TabletStateStore.java
new file mode 100644
index 0000000..29f518b
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/state/TabletStateStore.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.state;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.apache.accumulo.master.state.TabletLocationState;
+
+/**
+ * Interface for storing information about tablet assignments. There are three implementations:
+ * 
+ * ZooTabletStateStore: information about the root tablet is stored in ZooKeeper MetaDataStateStore: information about the other tablets are stored in the
+ * metadata table
+ * 
+ */
+public abstract class TabletStateStore implements Iterable<TabletLocationState> {
+  
+  /**
+   * Identifying name for this tablet state store.
+   */
+  abstract public String name();
+  
+  /**
+   * Scan the information about the tablets covered by this store
+   */
+  @Override
+  abstract public Iterator<TabletLocationState> iterator();
+  
+  /**
+   * Store the assigned locations in the data store.
+   * 
+   * @param assignments
+   * @throws DistributedStoreException
+   */
+  abstract public void setFutureLocations(Collection<Assignment> assignments) throws DistributedStoreException;
+  
+  /**
+   * Tablet servers will update the data store with the location when they bring the tablet online
+   * 
+   * @param assignments
+   * @throws DistributedStoreException
+   */
+  abstract public void setLocations(Collection<Assignment> assignments) throws DistributedStoreException;
+  
+  /**
+   * Mark the tablets as having no known or future location.
+   * 
+   * @param tablets
+   *          the tablets' current information
+   * @throws DistributedStoreException
+   */
+  abstract public void unassign(Collection<TabletLocationState> tablets) throws DistributedStoreException;
+  
+  public static void unassign(TabletLocationState tls) throws DistributedStoreException {
+    TabletStateStore store;
+    if (tls.extent.isRootTablet()) {
+      store = new ZooTabletStateStore();
+    } else if (tls.extent.isMeta()) {
+      store = new RootTabletStateStore();
+    } else {
+      store = new MetaDataStateStore();
+    }
+    store.unassign(Collections.singletonList(tls));
+  }
+  
+  public static void setLocation(Assignment assignment) throws DistributedStoreException {
+    TabletStateStore store;
+    if (assignment.tablet.isRootTablet()) {
+      store = new ZooTabletStateStore();
+    } else if (assignment.tablet.isMeta()) {
+      store = new RootTabletStateStore();
+    } else {
+      store = new MetaDataStateStore();
+    }
+    store.setLocations(Collections.singletonList(assignment));
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/88079cc3/server/master/src/main/java/org/apache/accumulo/master/state/ZooStore.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/state/ZooStore.java b/server/master/src/main/java/org/apache/accumulo/master/state/ZooStore.java
new file mode 100644
index 0000000..fea6f0e
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/state/ZooStore.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.state;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.zookeeper.ZooCache;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.log4j.Logger;
+
+public class ZooStore implements DistributedStore {
+  
+  private static final Logger log = Logger.getLogger(ZooStore.class);
+  
+  String basePath;
+  
+  ZooCache cache = new ZooCache();
+  
+  public ZooStore(String basePath) throws IOException {
+    if (basePath.endsWith("/"))
+      basePath = basePath.substring(0, basePath.length() - 1);
+    this.basePath = basePath;
+  }
+  
+  public ZooStore() throws IOException {
+    this(ZooUtil.getRoot(HdfsZooInstance.getInstance().getInstanceID()));
+  }
+  
+  @Override
+  public byte[] get(String path) throws DistributedStoreException {
+    try {
+      return cache.get(relative(path));
+    } catch (Exception ex) {
+      throw new DistributedStoreException(ex);
+    }
+  }
+  
+  private String relative(String path) {
+    return basePath + path;
+  }
+  
+  @Override
+  public List<String> getChildren(String path) throws DistributedStoreException {
+    try {
+      return cache.getChildren(relative(path));
+    } catch (Exception ex) {
+      throw new DistributedStoreException(ex);
+    }
+  }
+  
+  @Override
+  public void put(String path, byte[] bs) throws DistributedStoreException {
+    try {
+      path = relative(path);
+      ZooReaderWriter.getInstance().putPersistentData(path, bs, NodeExistsPolicy.OVERWRITE);
+      cache.clear();
+      log.debug("Wrote " + new String(bs) + " to " + path);
+    } catch (Exception ex) {
+      throw new DistributedStoreException(ex);
+    }
+  }
+  
+  @Override
+  public void remove(String path) throws DistributedStoreException {
+    try {
+      log.debug("Removing " + path);
+      path = relative(path);
+      IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+      if (zoo.exists(path))
+        zoo.recursiveDelete(path, NodeMissingPolicy.SKIP);
+      cache.clear();
+    } catch (Exception ex) {
+      throw new DistributedStoreException(ex);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/88079cc3/server/master/src/main/java/org/apache/accumulo/master/state/ZooTabletStateStore.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/state/ZooTabletStateStore.java b/server/master/src/main/java/org/apache/accumulo/master/state/ZooTabletStateStore.java
new file mode 100644
index 0000000..ccc60c8
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/state/ZooTabletStateStore.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.state;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.accumulo.master.state.TServerInstance;
+import org.apache.accumulo.master.state.TabletLocationState;
+import org.apache.accumulo.server.util.MetadataTableUtil;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.log4j.Logger;
+
+public class ZooTabletStateStore extends TabletStateStore {
+  
+  private static final Logger log = Logger.getLogger(ZooTabletStateStore.class);
+  final private DistributedStore store;
+  
+  public ZooTabletStateStore(DistributedStore store) {
+    this.store = store;
+  }
+  
+  public ZooTabletStateStore() throws DistributedStoreException {
+    try {
+      store = new ZooStore();
+    } catch (IOException ex) {
+      throw new DistributedStoreException(ex);
+    }
+  }
+  
+  @Override
+  public Iterator<TabletLocationState> iterator() {
+    return new Iterator<TabletLocationState>() {
+      boolean finished = false;
+      
+      @Override
+      public boolean hasNext() {
+        return !finished;
+      }
+      
+      @Override
+      public TabletLocationState next() {
+        finished = true;
+        try {
+          byte[] future = store.get(RootTable.ZROOT_TABLET_FUTURE_LOCATION);
+          byte[] current = store.get(RootTable.ZROOT_TABLET_LOCATION);
+          byte[] last = store.get(RootTable.ZROOT_TABLET_LAST_LOCATION);
+          
+          TServerInstance currentSession = null;
+          TServerInstance futureSession = null;
+          TServerInstance lastSession = null;
+          
+          if (future != null)
+            futureSession = parse(future);
+          
+          if (last != null)
+            lastSession = parse(last);
+          
+          if (current != null) {
+            currentSession = parse(current);
+            futureSession = null;
+          }
+          List<Collection<String>> logs = new ArrayList<Collection<String>>();
+          for (String entry : store.getChildren(RootTable.ZROOT_TABLET_WALOGS)) {
+            byte[] logInfo = store.get(RootTable.ZROOT_TABLET_WALOGS + "/" + entry);
+            if (logInfo != null) {
+              MetadataTableUtil.LogEntry logEntry = new MetadataTableUtil.LogEntry();
+              logEntry.fromBytes(logInfo);
+              logs.add(logEntry.logSet);
+              log.debug("root tablet logSet " + logEntry.logSet);
+            }
+          }
+          TabletLocationState result = new TabletLocationState(RootTable.EXTENT, futureSession, currentSession, lastSession, logs, false);
+          log.debug("Returning root tablet state: " + result);
+          return result;
+        } catch (Exception ex) {
+          throw new RuntimeException(ex);
+        }
+      }
+      
+      @Override
+      public void remove() {
+        throw new NotImplementedException();
+      }
+    };
+  }
+  
+  protected TServerInstance parse(byte[] current) {
+    String str = new String(current);
+    String[] parts = str.split("[|]", 2);
+    InetSocketAddress address = AddressUtil.parseAddress(parts[0], 0);
+    if (parts.length > 1 && parts[1] != null && parts[1].length() > 0) {
+      return new TServerInstance(address, parts[1]);
+    } else {
+      // a 1.2 location specification: DO NOT WANT
+      return null;
+    }
+  }
+  
+  @Override
+  public void setFutureLocations(Collection<Assignment> assignments) throws DistributedStoreException {
+    if (assignments.size() != 1)
+      throw new IllegalArgumentException("There is only one root tablet");
+    Assignment assignment = assignments.iterator().next();
+    if (assignment.tablet.compareTo(RootTable.EXTENT) != 0)
+      throw new IllegalArgumentException("You can only store the root tablet location");
+    String value = AddressUtil.toString(assignment.server.getLocation()) + "|" + assignment.server.getSession();
+    Iterator<TabletLocationState> currentIter = iterator();
+    TabletLocationState current = currentIter.next();
+    if (current.current != null) {
+      throw new DistributedStoreException("Trying to set the root tablet location: it is already set to " + current.current);
+    }
+    store.put(RootTable.ZROOT_TABLET_FUTURE_LOCATION, value.getBytes());
+  }
+  
+  @Override
+  public void setLocations(Collection<Assignment> assignments) throws DistributedStoreException {
+    if (assignments.size() != 1)
+      throw new IllegalArgumentException("There is only one root tablet");
+    Assignment assignment = assignments.iterator().next();
+    if (assignment.tablet.compareTo(RootTable.EXTENT) != 0)
+      throw new IllegalArgumentException("You can only store the root tablet location");
+    String value = AddressUtil.toString(assignment.server.getLocation()) + "|" + assignment.server.getSession();
+    Iterator<TabletLocationState> currentIter = iterator();
+    TabletLocationState current = currentIter.next();
+    if (current.current != null) {
+      throw new DistributedStoreException("Trying to set the root tablet location: it is already set to " + current.current);
+    }
+    if (!current.future.equals(assignment.server)) {
+      throw new DistributedStoreException("Root tablet is already assigned to " + current.future);
+    }
+    store.put(RootTable.ZROOT_TABLET_LOCATION, value.getBytes());
+    store.put(RootTable.ZROOT_TABLET_LAST_LOCATION, value.getBytes());
+    // Make the following unnecessary by making the entire update atomic
+    store.remove(RootTable.ZROOT_TABLET_FUTURE_LOCATION);
+    log.debug("Put down root tablet location");
+  }
+  
+  @Override
+  public void unassign(Collection<TabletLocationState> tablets) throws DistributedStoreException {
+    if (tablets.size() != 1)
+      throw new IllegalArgumentException("There is only one root tablet");
+    TabletLocationState tls = tablets.iterator().next();
+    if (tls.extent.compareTo(RootTable.EXTENT) != 0)
+      throw new IllegalArgumentException("You can only store the root tablet location");
+    store.remove(RootTable.ZROOT_TABLET_LOCATION);
+    store.remove(RootTable.ZROOT_TABLET_FUTURE_LOCATION);
+    log.debug("unassign root tablet location");
+  }
+  
+  @Override
+  public String name() {
+    return "Root Tablet";
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/88079cc3/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
new file mode 100644
index 0000000..6d75a45
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
@@ -0,0 +1,621 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.tableOps;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.IsolatedScanner;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.impl.ServerClient;
+import org.apache.accumulo.core.client.impl.Tables;
+import org.apache.accumulo.core.client.impl.thrift.ClientService;
+import org.apache.accumulo.core.client.impl.thrift.ClientService.Client;
+import org.apache.accumulo.core.client.impl.thrift.TableOperation;
+import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
+import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.SimpleThreadPool;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.fate.Repo;
+import org.apache.accumulo.master.Master;
+import org.apache.accumulo.master.LiveTServerSet.TServerConnection;
+import org.apache.accumulo.master.state.TServerInstance;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.server.tabletserver.UniqueNameAllocator;
+import org.apache.accumulo.server.util.MetadataTableUtil;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
+import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
+import org.apache.accumulo.trace.instrument.TraceExecutorService;
+import org.apache.accumulo.trace.instrument.Tracer;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+
+/*
+ * Bulk import makes requests of tablet servers, and those requests can take a
+ * long time. Our communications to the tablet server may fail, so we won't know
+ * the status of the request. The master will repeat failed requests so now
+ * there are multiple requests to the tablet server. The tablet server will not
+ * execute the request multiple times, so long as the marker it wrote in the
+ * metadata table stays there. The master needs to know when all requests have
+ * finished so it can remove the markers. Did it start? Did it finish? We can see
+ * that *a* request completed by seeing the flag written into the metadata
+ * table, but we won't know if some other rogue thread is still waiting to start
+ * a thread and repeat the operation.
+ * 
+ * The master can ask the tablet server if it has any requests still running.
+ * Except the tablet server might have some thread about to start a request, but
+ * before it has made any bookkeeping about the request. To prevent problems
+ * like this, an Arbitrator is used. Before starting any new request, the tablet
+ * server checks the Arbitrator to see if the request is still valid.
+ * 
+ */
+
+public class BulkImport extends MasterRepo {
+  public static final String FAILURES_TXT = "failures.txt";
+  
+  private static final long serialVersionUID = 1L;
+  
+  private static final Logger log = Logger.getLogger(BulkImport.class);
+  
+  private String tableId;
+  private String sourceDir;
+  private String errorDir;
+  private boolean setTime;
+  
+  public BulkImport(String tableId, String sourceDir, String errorDir, boolean setTime) {
+    this.tableId = tableId;
+    this.sourceDir = sourceDir;
+    this.errorDir = errorDir;
+    this.setTime = setTime;
+  }
+  
+  @Override
+  public long isReady(long tid, Master master) throws Exception {
+    if (!Utils.getReadLock(tableId, tid).tryLock())
+      return 100;
+    
+    Instance instance = HdfsZooInstance.getInstance();
+    Tables.clearCache(instance);
+    if (Tables.getTableState(instance, tableId) == TableState.ONLINE) {
+      long reserve1, reserve2;
+      reserve1 = reserve2 = Utils.reserveHdfsDirectory(sourceDir, tid);
+      if (reserve1 == 0)
+        reserve2 = Utils.reserveHdfsDirectory(errorDir, tid);
+      return reserve2;
+    } else {
+      throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.OFFLINE, null);
+    }
+  }
+  
+  @Override
+  public Repo<Master> call(long tid, Master master) throws Exception {
+    log.debug(" tid " + tid + " sourceDir " + sourceDir);
+    
+    Utils.getReadLock(tableId, tid).lock();
+    
+    // check that the error directory exists and is empty
+    VolumeManager fs = master.getFileSystem();
+    
+    Path errorPath = new Path(errorDir);
+    FileStatus errorStatus = null;
+    try {
+      errorStatus = fs.getFileStatus(errorPath);
+    } catch (FileNotFoundException ex) {
+      // ignored
+    }
+    if (errorStatus == null)
+      throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY, errorDir
+          + " does not exist");
+    if (!errorStatus.isDir())
+      throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY, errorDir
+          + " is not a directory");
+    if (fs.listStatus(errorPath).length != 0)
+      throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY, errorDir
+          + " is not empty");
+    
+    ZooArbitrator.start(Constants.BULK_ARBITRATOR_TYPE, tid);
+    
+    // move the files into the directory
+    try {
+      String bulkDir = prepareBulkImport(fs, sourceDir, tableId);
+      log.debug(" tid " + tid + " bulkDir " + bulkDir);
+      return new LoadFiles(tableId, sourceDir, bulkDir, errorDir, setTime);
+    } catch (IOException ex) {
+      log.error("error preparing the bulk import directory", ex);
+      throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_INPUT_DIRECTORY, sourceDir + ": "
+          + ex);
+    }
+  }
+  
+  private Path createNewBulkDir(VolumeManager fs, String tableId) throws IOException {
+    String tableDir = null;
+    loop: for (String dir : fs.getFileSystems().keySet()) {
+      if (this.sourceDir.startsWith(dir)) {
+        for (String path : ServerConstants.getTablesDirs()) {
+          if (path.startsWith(dir)) {
+            tableDir = path;
+            break loop;
+          }
+        }
+        break;
+      }
+    }
+    if (tableDir == null)
+      throw new IllegalStateException(sourceDir + " is not in a known namespace");
+    Path directory = new Path(tableDir + "/" + tableId);
+    fs.mkdirs(directory);
+    
+    // only one should be able to create the lock file
+    // the purpose of the lock file is to avoid a race
+    // condition between the call to fs.exists() and
+    // fs.mkdirs()... if only hadoop had a mkdir() function
+    // that failed when the dir existed
+    
+    UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
+    
+    while (true) {
+      Path newBulkDir = new Path(directory, Constants.BULK_PREFIX + namer.getNextName());
+      if (fs.exists(newBulkDir)) // sanity check
+        throw new IllegalStateException("Dir exist when it should not " + newBulkDir);
+      if (fs.mkdirs(newBulkDir))
+        return newBulkDir;
+      log.warn("Failed to create " + newBulkDir + " for unknown reason");
+      
+      UtilWaitThread.sleep(3000);
+    }
+  }
+  
+  private String prepareBulkImport(VolumeManager fs, String dir, String tableId) throws IOException {
+    Path bulkDir = createNewBulkDir(fs, tableId);
+    
+    MetadataTableUtil.addBulkLoadInProgressFlag("/" + bulkDir.getParent().getName() + "/" + bulkDir.getName());
+    
+    Path dirPath = new Path(dir);
+    FileStatus[] mapFiles = fs.listStatus(dirPath);
+    
+    UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
+    
+    for (FileStatus fileStatus : mapFiles) {
+      String sa[] = fileStatus.getPath().getName().split("\\.");
+      String extension = "";
+      if (sa.length > 1) {
+        extension = sa[sa.length - 1];
+        
+        if (!FileOperations.getValidExtensions().contains(extension)) {
+          log.warn(fileStatus.getPath() + " does not have a valid extension, ignoring");
+          continue;
+        }
+      } else {
+        // assume it is a map file
+        extension = Constants.MAPFILE_EXTENSION;
+      }
+      
+      if (extension.equals(Constants.MAPFILE_EXTENSION)) {
+        if (!fileStatus.isDir()) {
+          log.warn(fileStatus.getPath() + " is not a map file, ignoring");
+          continue;
+        }
+        
+        if (fileStatus.getPath().getName().equals("_logs")) {
+          log.info(fileStatus.getPath() + " is probably a log directory from a map/reduce task, skipping");
+          continue;
+        }
+        try {
+          FileStatus dataStatus = fs.getFileStatus(new Path(fileStatus.getPath(), MapFile.DATA_FILE_NAME));
+          if (dataStatus.isDir()) {
+            log.warn(fileStatus.getPath() + " is not a map file, ignoring");
+            continue;
+          }
+        } catch (FileNotFoundException fnfe) {
+          log.warn(fileStatus.getPath() + " is not a map file, ignoring");
+          continue;
+        }
+      }
+      
+      String newName = "I" + namer.getNextName() + "." + extension;
+      Path newPath = new Path(bulkDir, newName);
+      try {
+        fs.rename(fileStatus.getPath(), newPath);
+        log.debug("Moved " + fileStatus.getPath() + " to " + newPath);
+      } catch (IOException E1) {
+        log.error("Could not move: " + fileStatus.getPath().toString() + " " + E1.getMessage());
+      }
+    }
+    return bulkDir.toString();
+  }
+  
+  @Override
+  public void undo(long tid, Master environment) throws Exception {
+    // unreserve source/error directories
+    Utils.unreserveHdfsDirectory(sourceDir, tid);
+    Utils.unreserveHdfsDirectory(errorDir, tid);
+    Utils.getReadLock(tableId, tid).unlock();
+  }
+}
+
+class CleanUpBulkImport extends MasterRepo {
+  
+  private static final long serialVersionUID = 1L;
+  
+  private static final Logger log = Logger.getLogger(CleanUpBulkImport.class);
+  
+  private String tableId;
+  private String source;
+  private String bulk;
+  private String error;
+  
+  public CleanUpBulkImport(String tableId, String source, String bulk, String error) {
+    this.tableId = tableId;
+    this.source = source;
+    this.bulk = bulk;
+    this.error = error;
+  }
+  
+  @Override
+  public Repo<Master> call(long tid, Master master) throws Exception {
+    log.debug("removing the bulk processing flag file in " + bulk);
+    Path bulkDir = new Path(bulk);
+    MetadataTableUtil.removeBulkLoadInProgressFlag("/" + bulkDir.getParent().getName() + "/" + bulkDir.getName());
+    MetadataTableUtil.addDeleteEntry(tableId, "/" + bulkDir.getName());
+    log.debug("removing the metadata table markers for loaded files");
+    Connector conn = master.getConnector();
+    MetadataTableUtil.removeBulkLoadEntries(conn, tableId, tid);
+    log.debug("releasing HDFS reservations for " + source + " and " + error);
+    Utils.unreserveHdfsDirectory(source, tid);
+    Utils.unreserveHdfsDirectory(error, tid);
+    Utils.getReadLock(tableId, tid).unlock();
+    log.debug("completing bulk import transaction " + tid);
+    ZooArbitrator.cleanup(Constants.BULK_ARBITRATOR_TYPE, tid);
+    return null;
+  }
+}
+
+class CompleteBulkImport extends MasterRepo {
+  
+  private static final long serialVersionUID = 1L;
+  
+  private String tableId;
+  private String source;
+  private String bulk;
+  private String error;
+  
+  public CompleteBulkImport(String tableId, String source, String bulk, String error) {
+    this.tableId = tableId;
+    this.source = source;
+    this.bulk = bulk;
+    this.error = error;
+  }
+  
+  @Override
+  public Repo<Master> call(long tid, Master master) throws Exception {
+    ZooArbitrator.stop(Constants.BULK_ARBITRATOR_TYPE, tid);
+    return new CopyFailed(tableId, source, bulk, error);
+  }
+}
+
+class CopyFailed extends MasterRepo {
+  
+  private static final long serialVersionUID = 1L;
+  
+  private String tableId;
+  private String source;
+  private String bulk;
+  private String error;
+  
+  public CopyFailed(String tableId, String source, String bulk, String error) {
+    this.tableId = tableId;
+    this.source = source;
+    this.bulk = bulk;
+    this.error = error;
+  }
+  
+  @Override
+  public long isReady(long tid, Master master) throws Exception {
+    Set<TServerInstance> finished = new HashSet<TServerInstance>();
+    Set<TServerInstance> running = master.onlineTabletServers();
+    for (TServerInstance server : running) {
+      try {
+        TServerConnection client = master.getConnection(server);
+        if (client != null && !client.isActive(tid))
+          finished.add(server);
+      } catch (TException ex) {
+        log.info("Ignoring error trying to check on tid " + tid + " from server " + server + ": " + ex);
+      }
+    }
+    if (finished.containsAll(running))
+      return 0;
+    return 500;
+  }
+  
+  @Override
+  public Repo<Master> call(long tid, Master master) throws Exception {
+    // This needs to execute after the arbiter is stopped
+    
+    VolumeManager fs = master.getFileSystem();
+    
+    if (!fs.exists(new Path(error, BulkImport.FAILURES_TXT)))
+      return new CleanUpBulkImport(tableId, source, bulk, error);
+    
+    HashMap<String,String> failures = new HashMap<String,String>();
+    HashMap<String,String> loadedFailures = new HashMap<String,String>();
+    
+    FSDataInputStream failFile = fs.open(new Path(error, BulkImport.FAILURES_TXT));
+    BufferedReader in = new BufferedReader(new InputStreamReader(failFile));
+    try {
+      String line = null;
+      while ((line = in.readLine()) != null) {
+        Path path = new Path(line);
+        if (!fs.exists(new Path(error, path.getName())))
+          failures.put("/" + path.getParent().getName() + "/" + path.getName(), line);
+      }
+    } finally {
+      failFile.close();
+    }
+    
+    /*
+     * I thought I could move files that have no file references in the table. However its possible a clone references a file. Therefore only move files that
+     * have no loaded markers.
+     */
+    
+    // determine which failed files were loaded
+    Connector conn = master.getConnector();
+    Scanner mscanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY));
+    mscanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange());
+    mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME);
+    
+    for (Entry<Key,Value> entry : mscanner) {
+      if (Long.parseLong(entry.getValue().toString()) == tid) {
+        String loadedFile = entry.getKey().getColumnQualifier().toString();
+        String absPath = failures.remove(loadedFile);
+        if (absPath != null) {
+          loadedFailures.put(loadedFile, absPath);
+        }
+      }
+    }
+    
+    // move failed files that were not loaded
+    for (String failure : failures.values()) {
+      Path orig = new Path(failure);
+      Path dest = new Path(error, orig.getName());
+      fs.rename(orig, dest);
+      log.debug("tid " + tid + " renamed " + orig + " to " + dest + ": import failed");
+    }
+    
+    if (loadedFailures.size() > 0) {
+      DistributedWorkQueue bifCopyQueue = new DistributedWorkQueue(Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID()
+          + Constants.ZBULK_FAILED_COPYQ);
+      
+      HashSet<String> workIds = new HashSet<String>();
+      
+      for (String failure : loadedFailures.values()) {
+        Path orig = new Path(failure);
+        Path dest = new Path(error, orig.getName());
+        
+        if (fs.exists(dest))
+          continue;
+        
+        bifCopyQueue.addWork(orig.getName(), (failure + "," + dest).getBytes());
+        workIds.add(orig.getName());
+        log.debug("tid " + tid + " added to copyq: " + orig + " to " + dest + ": failed");
+      }
+      
+      bifCopyQueue.waitUntilDone(workIds);
+    }
+    
+    fs.deleteRecursively(new Path(error, BulkImport.FAILURES_TXT));
+    return new CleanUpBulkImport(tableId, source, bulk, error);
+  }
+  
+}
+
+class LoadFiles extends MasterRepo {
+  
+  private static final long serialVersionUID = 1L;
+  
+  private static ExecutorService threadPool = null;
+  static {
+    
+  }
+  private static final Logger log = Logger.getLogger(BulkImport.class);
+  
+  private String tableId;
+  private String source;
+  private String bulk;
+  private String errorDir;
+  private boolean setTime;
+  
+  public LoadFiles(String tableId, String source, String bulk, String errorDir, boolean setTime) {
+    this.tableId = tableId;
+    this.source = source;
+    this.bulk = bulk;
+    this.errorDir = errorDir;
+    this.setTime = setTime;
+  }
+  
+  @Override
+  public long isReady(long tid, Master master) throws Exception {
+    if (master.onlineTabletServers().size() == 0)
+      return 500;
+    return 0;
+  }
+  
+  synchronized void initializeThreadPool(Master master) {
+    if (threadPool == null) {
+      int threadPoolSize = master.getSystemConfiguration().getCount(Property.MASTER_BULK_THREADPOOL_SIZE);
+      ThreadPoolExecutor pool = new SimpleThreadPool(threadPoolSize, "bulk import");
+      pool.allowCoreThreadTimeOut(true);
+      threadPool = new TraceExecutorService(pool);
+    }
+  }
+  
+  @Override
+  public Repo<Master> call(final long tid, final Master master) throws Exception {
+    initializeThreadPool(master);
+    final SiteConfiguration conf = ServerConfiguration.getSiteConfiguration();
+    VolumeManager fs = master.getFileSystem();
+    List<FileStatus> files = new ArrayList<FileStatus>();
+    for (FileStatus entry : fs.listStatus(new Path(bulk))) {
+      files.add(entry);
+    }
+    log.debug("tid " + tid + " importing " + files.size() + " files");
+    
+    Path writable = new Path(this.errorDir, ".iswritable");
+    if (!fs.createNewFile(writable)) {
+      // Maybe this is a re-try... clear the flag and try again
+      fs.delete(writable);
+      if (!fs.createNewFile(writable))
+        throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY,
+            "Unable to write to " + this.errorDir);
+    }
+    fs.delete(writable);
+    
+    final Set<String> filesToLoad = Collections.synchronizedSet(new HashSet<String>());
+    for (FileStatus f : files)
+      filesToLoad.add(f.getPath().toString());
+    
+    final int RETRIES = Math.max(1, conf.getCount(Property.MASTER_BULK_RETRIES));
+    for (int attempt = 0; attempt < RETRIES && filesToLoad.size() > 0; attempt++) {
+      List<Future<List<String>>> results = new ArrayList<Future<List<String>>>();
+      
+      if (master.onlineTabletServers().size() == 0)
+        log.warn("There are no tablet server to process bulk import, waiting (tid = " + tid + ")");
+      
+      while (master.onlineTabletServers().size() == 0) {
+        UtilWaitThread.sleep(500);
+      }
+      
+      // Use the threadpool to assign files one-at-a-time to the server
+      final List<String> loaded = Collections.synchronizedList(new ArrayList<String>());
+      for (final String file : filesToLoad) {
+        results.add(threadPool.submit(new Callable<List<String>>() {
+          @Override
+          public List<String> call() {
+            List<String> failures = new ArrayList<String>();
+            ClientService.Client client = null;
+            String server = null;
+            try {
+              // get a connection to a random tablet server, do not prefer cached connections because
+              // this is running on the master and there are lots of connections to tablet servers
+              // serving the !METADATA tablets
+              long timeInMillis = master.getConfiguration().getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT);
+              Pair<String,Client> pair = ServerClient.getConnection(master.getInstance(), false, timeInMillis);
+              client = pair.getSecond();
+              server = pair.getFirst();
+              List<String> attempt = Collections.singletonList(file);
+              log.debug("Asking " + pair.getFirst() + " to bulk import " + file);
+              List<String> fail = client.bulkImportFiles(Tracer.traceInfo(), SystemCredentials.get().toThrift(master.getInstance()), tid, tableId, attempt,
+                  errorDir, setTime);
+              if (fail.isEmpty()) {
+                loaded.add(file);
+              } else {
+                failures.addAll(fail);
+              }
+            } catch (Exception ex) {
+              log.error("rpc failed server:" + server + ", tid:" + tid + " " + ex);
+            } finally {
+              ServerClient.close(client);
+            }
+            return failures;
+          }
+        }));
+      }
+      Set<String> failures = new HashSet<String>();
+      for (Future<List<String>> f : results)
+        failures.addAll(f.get());
+      filesToLoad.removeAll(loaded);
+      if (filesToLoad.size() > 0) {
+        log.debug("tid " + tid + " attempt " + (attempt + 1) + " " + sampleList(filesToLoad, 10) + " failed");
+        UtilWaitThread.sleep(100);
+      }
+    }
+    
+    FSDataOutputStream failFile = fs.create(new Path(errorDir, BulkImport.FAILURES_TXT), true);
+    BufferedWriter out = new BufferedWriter(new OutputStreamWriter(failFile));
+    try {
+      for (String f : filesToLoad) {
+        out.write(f);
+        out.write("\n");
+      }
+    } finally {
+      out.close();
+    }
+    
+    // return the next step, which will perform cleanup
+    return new CompleteBulkImport(tableId, source, bulk, errorDir);
+  }
+  
+  static String sampleList(Collection<?> potentiallyLongList, int max) {
+    StringBuffer result = new StringBuffer();
+    result.append("[");
+    int i = 0;
+    for (Object obj : potentiallyLongList) {
+      result.append(obj);
+      if (i >= max) {
+        result.append("...");
+        break;
+      } else {
+        result.append(", ");
+      }
+      i++;
+    }
+    if (i < max)
+      result.delete(result.length() - 2, result.length());
+    result.append("]");
+    return result.toString();
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/88079cc3/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java
new file mode 100644
index 0000000..dd4c229
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.tableOps;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.impl.thrift.TableOperation;
+import org.apache.accumulo.fate.Repo;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter.Mutator;
+import org.apache.accumulo.master.Master;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+
+class FinishCancelCompaction extends MasterRepo {
+  private static final long serialVersionUID = 1L;
+  private String tableId;
+  
+  public FinishCancelCompaction(String tableId) {
+    this.tableId = tableId;
+  }
+
+  @Override
+  public Repo<Master> call(long tid, Master environment) throws Exception {
+    Utils.getReadLock(tableId, tid).unlock();
+    return null;
+  }
+  
+  @Override
+  public void undo(long tid, Master environment) throws Exception {
+    
+  }
+}
+
+/**
+ * 
+ */
+public class CancelCompactions extends MasterRepo {
+  
+  private static final long serialVersionUID = 1L;
+  private String tableId;
+  
+  public CancelCompactions(String tableId) {
+    this.tableId = tableId;
+  }
+
+  @Override
+  public long isReady(long tid, Master environment) throws Exception {
+    return Utils.reserveTable(tableId, tid, false, true, TableOperation.COMPACT_CANCEL);
+  }
+  
+  @Override
+  public Repo<Master> call(long tid, Master environment) throws Exception {
+    String zCompactID = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_COMPACT_ID;
+    String zCancelID = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZTABLES + "/" + tableId
+        + Constants.ZTABLE_COMPACT_CANCEL_ID;
+    
+    IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+    
+    byte[] currentValue = zoo.getData(zCompactID, null);
+    
+    String cvs = new String(currentValue);
+    String[] tokens = cvs.split(",");
+    final long flushID = Long.parseLong(new String(tokens[0]));
+    
+    zoo.mutate(zCancelID, null, null, new Mutator() {
+      @Override
+      public byte[] mutate(byte[] currentValue) throws Exception {
+        long cid = Long.parseLong(new String(currentValue));
+        
+        if (cid < flushID)
+          return (flushID + "").getBytes();
+        else
+          return (cid + "").getBytes();
+
+      }
+    });
+
+    return new FinishCancelCompaction(tableId);
+  }
+  
+  @Override
+  public void undo(long tid, Master environment) throws Exception {
+    Utils.unreserveTable(tableId, tid, false);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/88079cc3/server/master/src/main/java/org/apache/accumulo/master/tableOps/ChangeTableState.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/ChangeTableState.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/ChangeTableState.java
new file mode 100644
index 0000000..697c15e
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/ChangeTableState.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.tableOps;
+
+import org.apache.accumulo.core.client.impl.thrift.TableOperation;
+import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.fate.Repo;
+import org.apache.accumulo.master.Master;
+import org.apache.accumulo.server.tables.TableManager;
+import org.apache.log4j.Logger;
+
+public class ChangeTableState extends MasterRepo {
+  
+  private static final long serialVersionUID = 1L;
+  private String tableId;
+  private TableOperation top;
+  
+  public ChangeTableState(String tableId, TableOperation top) {
+    this.tableId = tableId;
+    this.top = top;
+    
+    if (top != TableOperation.ONLINE && top != TableOperation.OFFLINE)
+      throw new IllegalArgumentException(top.toString());
+  }
+  
+  @Override
+  public long isReady(long tid, Master environment) throws Exception {
+    // reserve the table so that this op does not run concurrently with create, clone, or delete table
+    return Utils.reserveTable(tableId, tid, true, true, top);
+  }
+  
+  @Override
+  public Repo<Master> call(long tid, Master env) throws Exception {
+    
+    TableState ts = TableState.ONLINE;
+    if (top == TableOperation.OFFLINE)
+      ts = TableState.OFFLINE;
+    
+    TableManager.getInstance().transitionTableState(tableId, ts);
+    Utils.unreserveTable(tableId, tid, true);
+    Logger.getLogger(ChangeTableState.class).debug("Changed table state " + tableId + " " + ts);
+    env.getEventCoordinator().event("Set table state of %s to %s", tableId, ts);
+    return null;
+  }
+  
+  @Override
+  public void undo(long tid, Master env) throws Exception {
+    Utils.unreserveTable(tableId, tid, true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/88079cc3/server/master/src/main/java/org/apache/accumulo/master/tableOps/CloneTable.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CloneTable.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CloneTable.java
new file mode 100644
index 0000000..86a927c
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CloneTable.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.tableOps;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.impl.Tables;
+import org.apache.accumulo.core.client.impl.thrift.TableOperation;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.fate.Repo;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.master.Master;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.security.AuditedSecurityOperation;
+import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.server.tables.TableManager;
+import org.apache.accumulo.server.util.MetadataTableUtil;
+import org.apache.log4j.Logger;
+
+class CloneInfo implements Serializable {
+  
+  private static final long serialVersionUID = 1L;
+  
+  String srcTableId;
+  String tableName;
+  String tableId;
+  Map<String,String> propertiesToSet;
+  Set<String> propertiesToExclude;
+  
+  public String user;
+}
+
+class FinishCloneTable extends MasterRepo {
+  
+  private static final long serialVersionUID = 1L;
+  private CloneInfo cloneInfo;
+  
+  public FinishCloneTable(CloneInfo cloneInfo) {
+    this.cloneInfo = cloneInfo;
+  }
+  
+  @Override
+  public long isReady(long tid, Master environment) throws Exception {
+    return 0;
+  }
+  
+  @Override
+  public Repo<Master> call(long tid, Master environment) throws Exception {
+    // directories are intentionally not created.... this is done because directories should be unique
+    // because they occupy a different namespace than normal tablet directories... also some clones
+    // may never create files.. therefore there is no need to consume namenode space w/ directories
+    // that are not used... tablet will create directories as needed
+    
+    TableManager.getInstance().transitionTableState(cloneInfo.tableId, TableState.ONLINE);
+    
+    Utils.unreserveTable(cloneInfo.srcTableId, tid, false);
+    Utils.unreserveTable(cloneInfo.tableId, tid, true);
+    
+    environment.getEventCoordinator().event("Cloned table %s from %s", cloneInfo.tableName, cloneInfo.srcTableId);
+    
+    Logger.getLogger(FinishCloneTable.class).debug("Cloned table " + cloneInfo.srcTableId + " " + cloneInfo.tableId + " " + cloneInfo.tableName);
+    
+    return null;
+  }
+  
+  @Override
+  public void undo(long tid, Master environment) throws Exception {}
+  
+}
+
+class CloneMetadata extends MasterRepo {
+  
+  private static final long serialVersionUID = 1L;
+  private CloneInfo cloneInfo;
+  
+  public CloneMetadata(CloneInfo cloneInfo) {
+    this.cloneInfo = cloneInfo;
+  }
+  
+  @Override
+  public long isReady(long tid, Master environment) throws Exception {
+    return 0;
+  }
+  
+  @Override
+  public Repo<Master> call(long tid, Master environment) throws Exception {
+    Logger.getLogger(CloneMetadata.class).info(
+        String.format("Cloning %s with tableId %s from srcTableId %s", cloneInfo.tableName, cloneInfo.tableId, cloneInfo.srcTableId));
+    Instance instance = HdfsZooInstance.getInstance();
+    // need to clear out any metadata entries for tableId just in case this
+    // died before and is executing again
+    MetadataTableUtil.deleteTable(cloneInfo.tableId, false, SystemCredentials.get(), environment.getMasterLock());
+    MetadataTableUtil.cloneTable(instance, cloneInfo.srcTableId, cloneInfo.tableId);
+    return new FinishCloneTable(cloneInfo);
+  }
+  
+  @Override
+  public void undo(long tid, Master environment) throws Exception {
+    MetadataTableUtil.deleteTable(cloneInfo.tableId, false, SystemCredentials.get(), environment.getMasterLock());
+  }
+  
+}
+
+class CloneZookeeper extends MasterRepo {
+  
+  private static final long serialVersionUID = 1L;
+  
+  private CloneInfo cloneInfo;
+  
+  public CloneZookeeper(CloneInfo cloneInfo) {
+    this.cloneInfo = cloneInfo;
+  }
+  
+  @Override
+  public long isReady(long tid, Master environment) throws Exception {
+    return Utils.reserveTable(cloneInfo.tableId, tid, true, false, TableOperation.CLONE);
+  }
+  
+  @Override
+  public Repo<Master> call(long tid, Master environment) throws Exception {
+    Utils.tableNameLock.lock();
+    try {
+      // write tableName & tableId to zookeeper
+      Instance instance = HdfsZooInstance.getInstance();
+      
+      Utils.checkTableDoesNotExist(instance, cloneInfo.tableName, cloneInfo.tableId, TableOperation.CLONE);
+      
+      TableManager.getInstance().cloneTable(cloneInfo.srcTableId, cloneInfo.tableId, cloneInfo.tableName, cloneInfo.propertiesToSet,
+          cloneInfo.propertiesToExclude, NodeExistsPolicy.OVERWRITE);
+      Tables.clearCache(instance);
+      return new CloneMetadata(cloneInfo);
+    } finally {
+      Utils.tableNameLock.unlock();
+    }
+  }
+  
+  @Override
+  public void undo(long tid, Master environment) throws Exception {
+    Instance instance = HdfsZooInstance.getInstance();
+    TableManager.getInstance().removeTable(cloneInfo.tableId);
+    Utils.unreserveTable(cloneInfo.tableId, tid, true);
+    Tables.clearCache(instance);
+  }
+  
+}
+
+class ClonePermissions extends MasterRepo {
+  
+  private static final long serialVersionUID = 1L;
+  
+  private CloneInfo cloneInfo;
+  
+  public ClonePermissions(CloneInfo cloneInfo) {
+    this.cloneInfo = cloneInfo;
+  }
+  
+  @Override
+  public long isReady(long tid, Master environment) throws Exception {
+    return 0;
+  }
+  
+  @Override
+  public Repo<Master> call(long tid, Master environment) throws Exception {
+    // give all table permissions to the creator
+    for (TablePermission permission : TablePermission.values()) {
+      try {
+        AuditedSecurityOperation.getInstance().grantTablePermission(SystemCredentials.get().toThrift(environment.getInstance()), cloneInfo.user,
+            cloneInfo.tableId, permission);
+      } catch (ThriftSecurityException e) {
+        Logger.getLogger(FinishCloneTable.class).error(e.getMessage(), e);
+        throw e;
+      }
+    }
+    
+    // setup permissions in zookeeper before table info in zookeeper
+    // this way concurrent users will not get a spurious pemission denied
+    // error
+    return new CloneZookeeper(cloneInfo);
+  }
+  
+  @Override
+  public void undo(long tid, Master environment) throws Exception {
+    AuditedSecurityOperation.getInstance().deleteTable(SystemCredentials.get().toThrift(environment.getInstance()), cloneInfo.tableId);
+  }
+}
+
+public class CloneTable extends MasterRepo {
+  
+  private static final long serialVersionUID = 1L;
+  private CloneInfo cloneInfo;
+  
+  public CloneTable(String user, String srcTableId, String tableName, Map<String,String> propertiesToSet, Set<String> propertiesToExclude) {
+    cloneInfo = new CloneInfo();
+    cloneInfo.user = user;
+    cloneInfo.srcTableId = srcTableId;
+    cloneInfo.tableName = tableName;
+    cloneInfo.propertiesToExclude = propertiesToExclude;
+    cloneInfo.propertiesToSet = propertiesToSet;
+  }
+  
+  @Override
+  public long isReady(long tid, Master environment) throws Exception {
+    return Utils.reserveTable(cloneInfo.srcTableId, tid, false, true, TableOperation.CLONE);
+  }
+  
+  @Override
+  public Repo<Master> call(long tid, Master environment) throws Exception {
+    
+    Utils.idLock.lock();
+    try {
+      Instance instance = HdfsZooInstance.getInstance();
+      cloneInfo.tableId = Utils.getNextTableId(cloneInfo.tableName, instance);
+      return new ClonePermissions(cloneInfo);
+    } finally {
+      Utils.idLock.unlock();
+    }
+  }
+  
+  @Override
+  public void undo(long tid, Master environment) throws Exception {
+    Utils.unreserveTable(cloneInfo.srcTableId, tid, false);
+  }
+  
+}


Mime
View raw message