accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [53/53] [abbrv] git commit: ACCUMULO-658 Move tests and resources to correct modules
Date Fri, 06 Sep 2013 01:49:21 GMT
ACCUMULO-658 Move tests and resources to correct modules


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/162bd40d
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/162bd40d
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/162bd40d

Branch: refs/heads/ACCUMULO-210
Commit: 162bd40d081a07265ec4ec2f57f1b3c763499899
Parents: cc9f575
Author: Christopher Tubbs <ctubbsii@apache.org>
Authored: Tue Aug 6 11:41:15 2013 -0400
Committer: Christopher Tubbs <ctubbsii@apache.org>
Committed: Thu Sep 5 21:47:33 2013 -0400

----------------------------------------------------------------------
 minicluster/pom.xml                             |   8 +-
 server/base/pom.xml                             |   8 +
 .../apache/accumulo/master/LiveTServerSet.java  | 398 ++++++++++++
 .../apache/accumulo/server/util/Initialize.java | 522 ++++++++++++++++
 .../constraints/MetadataConstraints.java        | 315 ++++++++++
 .../iterators/MetadataBulkLoadFilter.java       |  91 +++
 .../server/client/BulkImporterTest.java         | 152 +++++
 .../constraints/MetadataConstraintsTest.java    | 240 ++++++++
 .../server/data/ServerMutationTest.java         |  79 +++
 .../iterators/MetadataBulkLoadFilterTest.java   | 144 +++++
 .../server/security/SystemCredentialsTest.java  |  65 ++
 .../security/handler/ZKAuthenticatorTest.java   |  87 +++
 .../apache/accumulo/server/util/CloneTest.java  | 375 ++++++++++++
 .../accumulo/server/util/DefaultMapTest.java    |  47 ++
 .../server/util/TabletIteratorTest.java         | 107 ++++
 .../server/util/time/BaseRelativeTimeTest.java  |  89 +++
 .../base/src/test/resources/accumulo-site.xml   |  32 +
 server/base/src/test/resources/log4j.properties |  21 +
 .../apache/accumulo/master/LiveTServerSet.java  | 398 ------------
 server/server/pom.xml                           | 136 -----
 .../accumulo/server/metanalysis/FilterMeta.java |  92 ---
 .../accumulo/server/metanalysis/FindTablet.java |  66 --
 .../accumulo/server/metanalysis/IndexMeta.java  | 176 ------
 .../server/metanalysis/LogFileInputFormat.java  | 116 ----
 .../server/metanalysis/LogFileOutputFormat.java |  66 --
 .../server/metanalysis/PrintEvents.java         |  99 ---
 .../server/metanalysis/package-info.java        |  34 --
 .../server/util/FindOfflineTablets.java         | 132 ----
 .../apache/accumulo/server/util/Initialize.java | 522 ----------------
 .../src/main/resources/randomwalk/Basic.xml     |  37 --
 .../src/main/resources/randomwalk/Simple.xml    |  43 --
 .../src/main/resources/randomwalk/module.xsd    |  69 ---
 .../server/client/BulkImporterTest.java         | 152 -----
 .../constraints/MetadataConstraintsTest.java    | 240 --------
 .../server/data/ServerMutationTest.java         |  79 ---
 .../iterators/MetadataBulkLoadFilterTest.java   | 144 -----
 .../server/security/SystemCredentialsTest.java  |  65 --
 .../security/handler/ZKAuthenticatorTest.java   |  87 ---
 .../tabletserver/CheckTabletMetadataTest.java   | 122 ----
 .../server/tabletserver/InMemoryMapTest.java    | 492 ---------------
 .../tabletserver/log/MultiReaderTest.java       | 146 -----
 .../tabletserver/log/SortedLogRecoveryTest.java | 602 -------------------
 .../apache/accumulo/server/util/CloneTest.java  | 375 ------------
 .../accumulo/server/util/DefaultMapTest.java    |  47 --
 .../server/util/TabletIteratorTest.java         | 107 ----
 .../server/util/time/BaseRelativeTimeTest.java  |  89 ---
 .../accumulo/tserver/logger/LogFileTest.java    | 124 ----
 .../server/src/test/resources/accumulo-site.xml |  32 -
 .../server/src/test/resources/log4j.properties  |  21 -
 .../constraints/MetadataConstraints.java        | 315 ----------
 .../iterators/MetadataBulkLoadFilter.java       |  91 ---
 .../tabletserver/CheckTabletMetadataTest.java   | 122 ++++
 .../server/tabletserver/InMemoryMapTest.java    | 492 +++++++++++++++
 .../tabletserver/log/MultiReaderTest.java       | 146 +++++
 .../tabletserver/log/SortedLogRecoveryTest.java | 602 +++++++++++++++++++
 .../accumulo/tserver/logger/LogFileTest.java    | 124 ++++
 server/utils/pom.xml                            |   6 +-
 .../accumulo/server/metanalysis/FilterMeta.java |  92 +++
 .../accumulo/server/metanalysis/FindTablet.java |  66 ++
 .../accumulo/server/metanalysis/IndexMeta.java  | 176 ++++++
 .../server/metanalysis/LogFileInputFormat.java  | 116 ++++
 .../server/metanalysis/LogFileOutputFormat.java |  66 ++
 .../server/metanalysis/PrintEvents.java         |  99 +++
 .../server/metanalysis/package-info.java        |  34 ++
 .../server/util/FindOfflineTablets.java         | 132 ++++
 test/pom.xml                                    |   6 +-
 test/src/main/resources/randomwalk/Basic.xml    |  37 ++
 test/src/main/resources/randomwalk/Simple.xml   |  43 ++
 test/src/main/resources/randomwalk/module.xsd   |  69 +++
 69 files changed, 5200 insertions(+), 5324 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/162bd40d/minicluster/pom.xml
----------------------------------------------------------------------
diff --git a/minicluster/pom.xml b/minicluster/pom.xml
index c35e7a5..f984a42 100644
--- a/minicluster/pom.xml
+++ b/minicluster/pom.xml
@@ -35,15 +35,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.accumulo</groupId>
-      <artifactId>accumulo-core</artifactId>
+      <artifactId>accumulo-master</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.accumulo</groupId>
-      <artifactId>accumulo-server</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.accumulo</groupId>
-      <artifactId>accumulo-start</artifactId>
+      <artifactId>accumulo-tserver</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/162bd40d/server/base/pom.xml
----------------------------------------------------------------------
diff --git a/server/base/pom.xml b/server/base/pom.xml
index 5fc94cc..409e884 100644
--- a/server/base/pom.xml
+++ b/server/base/pom.xml
@@ -124,4 +124,12 @@
       <scope>test</scope>
     </dependency>
   </dependencies>
+  <build>
+    <testResources>
+      <testResource>
+        <filtering>true</filtering>
+        <directory>src/test/resources</directory>
+      </testResource>
+    </testResources>
+  </build>
 </project>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/162bd40d/server/base/src/main/java/org/apache/accumulo/master/LiveTServerSet.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/master/LiveTServerSet.java b/server/base/src/main/java/org/apache/accumulo/master/LiveTServerSet.java
new file mode 100644
index 0000000..59ab8c8
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/master/LiveTServerSet.java
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master;
+
+import static org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy.SKIP;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.accumulo.core.util.ServerServices;
+import org.apache.accumulo.core.util.ThriftUtil;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.master.state.TServerInstance;
+import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.server.util.Halt;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.accumulo.server.zookeeper.ZooCache;
+import org.apache.accumulo.server.zookeeper.ZooLock;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.trace.instrument.Tracer;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransport;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NotEmptyException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+
+public class LiveTServerSet implements Watcher {
+  
+  public interface Listener {
+    void update(LiveTServerSet current, Set<TServerInstance> deleted, Set<TServerInstance> added);
+  }
+  
+  private static final Logger log = Logger.getLogger(LiveTServerSet.class);
+  
+  private final Listener cback;
+  private final Instance instance;
+  private final AccumuloConfiguration conf;
+  private ZooCache zooCache;
+  
+  public class TServerConnection {
+    private final InetSocketAddress address;
+    
+    public TServerConnection(InetSocketAddress addr) throws TException {
+      address = addr;
+    }
+    
+    private String lockString(ZooLock mlock) {
+      return mlock.getLockID().serialize(ZooUtil.getRoot(instance) + Constants.ZMASTER_LOCK);
+    }
+    
+    public void assignTablet(ZooLock lock, KeyExtent extent) throws TException {
+      TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
+      try {
+        client.loadTablet(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), extent.toThrift());
+      } finally {
+        ThriftUtil.returnClient(client);
+      }
+    }
+    
+    public void unloadTablet(ZooLock lock, KeyExtent extent, boolean save) throws TException {
+      TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
+      try {
+        client.unloadTablet(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), extent.toThrift(), save);
+      } finally {
+        ThriftUtil.returnClient(client);
+      }
+    }
+    
+    public TabletServerStatus getTableMap(boolean usePooledConnection) throws TException, ThriftSecurityException {
+      
+      if (usePooledConnection == true)
+        throw new UnsupportedOperationException();
+      
+      TTransport transport = ThriftUtil.createTransport(address, conf);
+      
+      try {
+        TabletClientService.Client client = ThriftUtil.createClient(new TabletClientService.Client.Factory(), transport);
+        return client.getTabletServerStatus(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance));
+      } finally {
+        if (transport != null)
+          transport.close();
+      }
+    }
+    
+    public void halt(ZooLock lock) throws TException, ThriftSecurityException {
+      TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
+      try {
+        client.halt(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock));
+      } finally {
+        ThriftUtil.returnClient(client);
+      }
+    }
+    
+    public void fastHalt(ZooLock lock) throws TException {
+      TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
+      try {
+        client.fastHalt(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock));
+      } finally {
+        ThriftUtil.returnClient(client);
+      }
+    }
+    
+    public void flush(ZooLock lock, String tableId, byte[] startRow, byte[] endRow) throws TException {
+      TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
+      try {
+        client.flush(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), tableId,
+            startRow == null ? null : ByteBuffer.wrap(startRow), endRow == null ? null : ByteBuffer.wrap(endRow));
+      } finally {
+        ThriftUtil.returnClient(client);
+      }
+    }
+    
+    public void chop(ZooLock lock, KeyExtent extent) throws TException {
+      TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
+      try {
+        client.chop(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), extent.toThrift());
+      } finally {
+        ThriftUtil.returnClient(client);
+      }
+    }
+    
+    public void splitTablet(ZooLock lock, KeyExtent extent, Text splitPoint) throws TException, ThriftSecurityException, NotServingTabletException {
+      TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
+      try {
+        client.splitTablet(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), extent.toThrift(),
+            ByteBuffer.wrap(splitPoint.getBytes(), 0, splitPoint.getLength()));
+      } finally {
+        ThriftUtil.returnClient(client);
+      }
+    }
+    
+    public void flushTablet(ZooLock lock, KeyExtent extent) throws TException {
+      TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
+      try {
+        client.flushTablet(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), extent.toThrift());
+      } finally {
+        ThriftUtil.returnClient(client);
+      }
+    }
+    
+    public void compact(ZooLock lock, String tableId, byte[] startRow, byte[] endRow) throws TException {
+      TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
+      try {
+        client.compact(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), tableId,
+            startRow == null ? null : ByteBuffer.wrap(startRow), endRow == null ? null : ByteBuffer.wrap(endRow));
+      } finally {
+        ThriftUtil.returnClient(client);
+      }
+    }
+    
+    public boolean isActive(long tid) throws TException {
+      TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
+      try {
+        return client.isActive(Tracer.traceInfo(), tid);
+      } finally {
+        ThriftUtil.returnClient(client);
+      }
+    }
+    
+  }
+  
+  static class TServerInfo {
+    TServerConnection connection;
+    TServerInstance instance;
+    
+    TServerInfo(TServerInstance instance, TServerConnection connection) {
+      this.connection = connection;
+      this.instance = instance;
+    }
+  };
+  
+  // The set of active tservers with locks, indexed by their name in zookeeper
+  private Map<String,TServerInfo> current = new HashMap<String,TServerInfo>();
+  // as above, indexed by TServerInstance
+  private Map<TServerInstance,TServerInfo> currentInstances = new HashMap<TServerInstance,TServerInfo>();
+  
+  // The set of entries in zookeeper without locks, and the first time each was noticed
+  private Map<String,Long> locklessServers = new HashMap<String,Long>();
+  
+  public LiveTServerSet(Instance instance, AccumuloConfiguration conf, Listener cback) {
+    this.cback = cback;
+    this.instance = instance;
+    this.conf = conf;
+    
+  }
+  
+  public synchronized ZooCache getZooCache() {
+    if (zooCache == null)
+      zooCache = new ZooCache(this);
+    return zooCache;
+  }
+  
+  public synchronized void startListeningForTabletServerChanges() {
+    scanServers();
+    SimpleTimer.getInstance().schedule(new Runnable() {
+      @Override
+      public void run() {
+        scanServers();
+      }
+    }, 0, 5000);
+  }
+  
+  public synchronized void scanServers() {
+    try {
+      final Set<TServerInstance> updates = new HashSet<TServerInstance>();
+      final Set<TServerInstance> doomed = new HashSet<TServerInstance>();
+      
+      final String path = ZooUtil.getRoot(instance) + Constants.ZTSERVERS;
+      
+      HashSet<String> all = new HashSet<String>(current.keySet());
+      all.addAll(getZooCache().getChildren(path));
+      
+      locklessServers.keySet().retainAll(all);
+      
+      for (String zPath : all) {
+        checkServer(updates, doomed, path, zPath);
+      }
+      
+      // log.debug("Current: " + current.keySet());
+      if (!doomed.isEmpty() || !updates.isEmpty())
+        this.cback.update(this, doomed, updates);
+    } catch (Exception ex) {
+      log.error(ex, ex);
+    }
+  }
+  
+  private void deleteServerNode(String serverNode) throws InterruptedException, KeeperException {
+    try {
+      ZooReaderWriter.getInstance().delete(serverNode, -1);
+    } catch (NotEmptyException ex) {
+      // race condition: tserver created the lock after our last check; we'll see it at the next check
+    } catch (NoNodeException nne) {
+      // someone else deleted it
+    }
+  }
+  
+  private synchronized void checkServer(final Set<TServerInstance> updates, final Set<TServerInstance> doomed, final String path, final String zPath)
+      throws TException, InterruptedException, KeeperException {
+    
+    TServerInfo info = current.get(zPath);
+    
+    final String lockPath = path + "/" + zPath;
+    Stat stat = new Stat();
+    byte[] lockData = ZooLock.getLockData(getZooCache(), lockPath, stat);
+    
+    if (lockData == null) {
+      if (info != null) {
+        doomed.add(info.instance);
+        current.remove(zPath);
+        currentInstances.remove(info.instance);
+      }
+      
+      Long firstSeen = locklessServers.get(zPath);
+      if (firstSeen == null) {
+        locklessServers.put(zPath, System.currentTimeMillis());
+      } else if (System.currentTimeMillis() - firstSeen > 10 * 60 * 1000) {
+        deleteServerNode(path + "/" + zPath);
+        locklessServers.remove(zPath);
+      }
+    } else {
+      locklessServers.remove(zPath);
+      ServerServices services = new ServerServices(new String(lockData));
+      InetSocketAddress client = services.getAddress(ServerServices.Service.TSERV_CLIENT);
+      TServerInstance instance = new TServerInstance(client, stat.getEphemeralOwner());
+      
+      if (info == null) {
+        updates.add(instance);
+        TServerInfo tServerInfo = new TServerInfo(instance, new TServerConnection(client));
+        current.put(zPath, tServerInfo);
+        currentInstances.put(instance, tServerInfo);
+      } else if (!info.instance.equals(instance)) {
+        doomed.add(info.instance);
+        updates.add(instance);
+        TServerInfo tServerInfo = new TServerInfo(instance, new TServerConnection(client));
+        current.put(zPath, tServerInfo);
+        currentInstances.put(info.instance, tServerInfo);
+      }
+    }
+  }
+  
+  @Override
+  public void process(WatchedEvent event) {
+    
+    // its important that these event are propagated by ZooCache, because this ensures when reading zoocache that is has already processed the event and cleared
+    // relevant nodes before code below reads from zoocache
+    
+    if (event.getPath() != null) {
+      if (event.getPath().endsWith(Constants.ZTSERVERS)) {
+        scanServers();
+      } else if (event.getPath().contains(Constants.ZTSERVERS)) {
+        int pos = event.getPath().lastIndexOf('/');
+        
+        // do only if ZTSERVER is parent
+        if (pos >= 0 && event.getPath().substring(0, pos).endsWith(Constants.ZTSERVERS)) {
+          
+          String server = event.getPath().substring(pos + 1);
+          
+          final Set<TServerInstance> updates = new HashSet<TServerInstance>();
+          final Set<TServerInstance> doomed = new HashSet<TServerInstance>();
+          
+          final String path = ZooUtil.getRoot(instance) + Constants.ZTSERVERS;
+          
+          try {
+            checkServer(updates, doomed, path, server);
+            if (!doomed.isEmpty() || !updates.isEmpty())
+              this.cback.update(this, doomed, updates);
+          } catch (Exception ex) {
+            log.error(ex, ex);
+          }
+        }
+      }
+    }
+  }
+  
+  public synchronized TServerConnection getConnection(TServerInstance server) throws TException {
+    if (server == null)
+      return null;
+    TServerInfo tServerInfo = currentInstances.get(server);
+    if (tServerInfo == null)
+      return null;
+    return tServerInfo.connection;
+  }
+  
+  public synchronized Set<TServerInstance> getCurrentServers() {
+    return new HashSet<TServerInstance>(currentInstances.keySet());
+  }
+  
+  public synchronized int size() {
+    return current.size();
+  }
+  
+  public synchronized TServerInstance find(String tabletServer) {
+    InetSocketAddress addr = AddressUtil.parseAddress(tabletServer);
+    for (Entry<String,TServerInfo> entry : current.entrySet()) {
+      if (entry.getValue().instance.getLocation().equals(addr))
+        return entry.getValue().instance;
+    }
+    return null;
+  }
+  
+  public synchronized void remove(TServerInstance server) {
+    String zPath = null;
+    for (Entry<String,TServerInfo> entry : current.entrySet()) {
+      if (entry.getValue().instance.equals(server)) {
+        zPath = entry.getKey();
+        break;
+      }
+    }
+    if (zPath == null)
+      return;
+    current.remove(zPath);
+    currentInstances.remove(server);
+    
+    log.info("Removing zookeeper lock for " + server);
+    String fullpath = ZooUtil.getRoot(instance) + Constants.ZTSERVERS + "/" + zPath;
+    try {
+      ZooReaderWriter.getRetryingInstance().recursiveDelete(fullpath, SKIP);
+    } catch (Exception e) {
+      String msg = "error removing tablet server lock";
+      log.fatal(msg, e);
+      Halt.halt(msg, -1);
+    }
+    getZooCache().clear(fullpath);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/162bd40d/server/base/src/main/java/org/apache/accumulo/server/util/Initialize.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/Initialize.java b/server/base/src/main/java/org/apache/accumulo/server/util/Initialize.java
new file mode 100644
index 0000000..0eb6d36
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/Initialize.java
@@ -0,0 +1,522 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map.Entry;
+import java.util.UUID;
+
+import jline.console.ConsoleReader;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.cli.Help;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.iterators.user.VersioningIterator;
+import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.master.thrift.MasterGoalState;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
+import org.apache.accumulo.core.security.SecurityUtil;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.security.AuditedSecurityOperation;
+import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.server.tables.TableManager;
+import org.apache.accumulo.server.tabletserver.TabletTime;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.tserver.constraints.MetadataConstraints;
+import org.apache.accumulo.tserver.iterators.MetadataBulkLoadFilter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+
+import com.beust.jcommander.Parameter;
+
+/**
+ * This class is used to setup the directory structure and the root tablet to get an instance started
+ * 
+ */
+public class Initialize {
+  private static final Logger log = Logger.getLogger(Initialize.class);
+  private static final String DEFAULT_ROOT_USER = "root";
+  public static final String TABLE_TABLETS_TABLET_DIR = "/table_info";
+  
+  private static ConsoleReader reader = null;
+  
+  private static ConsoleReader getConsoleReader() throws IOException {
+    if (reader == null)
+      reader = new ConsoleReader();
+    return reader;
+  }
+  
+  private static HashMap<String,String> initialMetadataConf = new HashMap<String,String>();
+  static {
+    initialMetadataConf.put(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "32K");
+    initialMetadataConf.put(Property.TABLE_FILE_REPLICATION.getKey(), "5");
+    initialMetadataConf.put(Property.TABLE_WALOG_ENABLED.getKey(), "true");
+    initialMetadataConf.put(Property.TABLE_MAJC_RATIO.getKey(), "1");
+    initialMetadataConf.put(Property.TABLE_SPLIT_THRESHOLD.getKey(), "64M");
+    initialMetadataConf.put(Property.TABLE_CONSTRAINT_PREFIX.getKey() + "1", MetadataConstraints.class.getName());
+    initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "scan.vers", "10," + VersioningIterator.class.getName());
+    initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "scan.vers.opt.maxVersions", "1");
+    initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "minc.vers", "10," + VersioningIterator.class.getName());
+    initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "minc.vers.opt.maxVersions", "1");
+    initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.vers", "10," + VersioningIterator.class.getName());
+    initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.vers.opt.maxVersions", "1");
+    initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.bulkLoadFilter", "20," + MetadataBulkLoadFilter.class.getName());
+    initialMetadataConf.put(Property.TABLE_FAILURES_IGNORE.getKey(), "false");
+    initialMetadataConf.put(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + "tablet",
+        String.format("%s,%s", TabletsSection.TabletColumnFamily.NAME, TabletsSection.CurrentLocationColumnFamily.NAME));
+    initialMetadataConf.put(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + "server", String.format("%s,%s,%s,%s", DataFileColumnFamily.NAME,
+        LogColumnFamily.NAME, TabletsSection.ServerColumnFamily.NAME, TabletsSection.FutureLocationColumnFamily.NAME));
+    initialMetadataConf.put(Property.TABLE_LOCALITY_GROUPS.getKey(), "tablet,server");
+    initialMetadataConf.put(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY.getKey(), "");
+    initialMetadataConf.put(Property.TABLE_INDEXCACHE_ENABLED.getKey(), "true");
+    initialMetadataConf.put(Property.TABLE_BLOCKCACHE_ENABLED.getKey(), "true");
+  }
+  
+  public static boolean doInit(Opts opts, Configuration conf, VolumeManager fs) throws IOException {
+    if (!ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_DFS_URI).equals(""))
+      log.info("Hadoop Filesystem is " + ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_DFS_URI));
+    else
+      log.info("Hadoop Filesystem is " + FileSystem.getDefaultUri(conf));
+    
+    log.info("Accumulo data dirs are " + Arrays.asList(ServerConstants.getBaseDirs()));
+    log.info("Zookeeper server is " + ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_ZK_HOST));
+    log.info("Checking if Zookeeper is available. If this hangs, then you need to make sure zookeeper is running");
+    if (!zookeeperAvailable()) {
+      log.fatal("Zookeeper needs to be up and running in order to init. Exiting ...");
+      return false;
+    }
+    if (ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_SECRET).equals(Property.INSTANCE_SECRET.getDefaultValue())) {
+      ConsoleReader c = getConsoleReader();
+      c.beep();
+      c.println();
+      c.println();
+      c.println("Warning!!! Your instance secret is still set to the default, this is not secure. We highly recommend you change it.");
+      c.println();
+      c.println();
+      c.println("You can change the instance secret in accumulo by using:");
+      c.println("   bin/accumulo " + org.apache.accumulo.server.util.ChangeSecret.class.getName() + " oldPassword newPassword.");
+      c.println("You will also need to edit your secret in your configuration file by adding the property instance.secret to your conf/accumulo-site.xml. Without this accumulo will not operate correctly");
+    }
+    
+    try {
+      if (isInitialized(fs)) {
+        log.fatal("It appears this location was previously initialized, exiting ... ");
+        return false;
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    
+    // prompt user for instance name and root password early, in case they
+    // abort, we don't leave an inconsistent HDFS/ZooKeeper structure
+    String instanceNamePath;
+    try {
+      instanceNamePath = getInstanceNamePath(opts);
+    } catch (Exception e) {
+      log.fatal("Failed to talk to zookeeper", e);
+      return false;
+    }
+    opts.rootpass = getRootPassword(opts);
+    return initialize(opts, instanceNamePath, fs);
+  }
+  
+  public static boolean initialize(Opts opts, String instanceNamePath, VolumeManager fs) {
+    
+    UUID uuid = UUID.randomUUID();
+    try {
+      initZooKeeper(opts, uuid.toString(), instanceNamePath);
+    } catch (Exception e) {
+      log.fatal("Failed to initialize zookeeper", e);
+      return false;
+    }
+    
+    try {
+      initFileSystem(opts, fs, uuid);
+    } catch (Exception e) {
+      log.fatal("Failed to initialize filesystem", e);
+      return false;
+    }
+    
+    try {
+      initSecurity(opts, uuid.toString());
+    } catch (Exception e) {
+      log.fatal("Failed to initialize security", e);
+      return false;
+    }
+    return true;
+  }
+  
+  private static boolean zookeeperAvailable() {
+    IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+    try {
+      return zoo.exists("/");
+    } catch (KeeperException e) {
+      return false;
+    } catch (InterruptedException e) {
+      return false;
+    }
+  }
+  
+  private static Path[] paths(String[] paths) {
+    Path[] result = new Path[paths.length];
+    for (int i = 0; i < paths.length; i++) {
+      result[i] = new Path(paths[i]);
+    }
+    return result;
+  }
+  
+  private static <T> T[] concat(T[] a, T[] b) {
+    List<T> result = new ArrayList<T>(a.length + b.length);
+    for (int i = 0; i < a.length; i++) {
+      result.add(a[i]);
+    }
+    for (int i = 0; i < b.length; i++) {
+      result.add(b[i]);
+    }
+    return result.toArray(a);
+  }
+  
+  private static void initFileSystem(Opts opts, VolumeManager fs, UUID uuid) throws IOException {
+    FileStatus fstat;
+    
+    // the actual disk locations of the root table and tablets
+    final Path rootTablet = new Path(ServerConstants.getRootTabletDir());
+    
+    // the actual disk locations of the metadata table and tablets
+    final Path[] metadataTableDirs = paths(ServerConstants.getMetadataTableDirs());
+    final Path[] tableMetadataTabletDirs = paths(ServerConstants.prefix(ServerConstants.getMetadataTableDirs(), TABLE_TABLETS_TABLET_DIR));
+    final Path[] defaultMetadataTabletDirs = paths(ServerConstants.prefix(ServerConstants.getMetadataTableDirs(), Constants.DEFAULT_TABLET_LOCATION));
+    
+    fs.mkdirs(new Path(ServerConstants.getDataVersionLocation(), "" + ServerConstants.DATA_VERSION));
+    
+    // create an instance id
+    fs.mkdirs(ServerConstants.getInstanceIdLocation());
+    fs.createNewFile(new Path(ServerConstants.getInstanceIdLocation(), uuid.toString()));
+    
+    // initialize initial metadata config in zookeeper
+    initMetadataConfig();
+    
+    // create metadata table
+    for (Path mtd : metadataTableDirs) {
+      try {
+        fstat = fs.getFileStatus(mtd);
+        if (!fstat.isDir()) {
+          log.fatal("location " + mtd.toString() + " exists but is not a directory");
+          return;
+        }
+      } catch (FileNotFoundException fnfe) {
+        if (!fs.mkdirs(mtd)) {
+          log.fatal("unable to create directory " + mtd.toString());
+          return;
+        }
+      }
+    }
+    
+    // create root table and tablet
+    try {
+      fstat = fs.getFileStatus(rootTablet);
+      if (!fstat.isDir()) {
+        log.fatal("location " + rootTablet.toString() + " exists but is not a directory");
+        return;
+      }
+    } catch (FileNotFoundException fnfe) {
+      if (!fs.mkdirs(rootTablet)) {
+        log.fatal("unable to create directory " + rootTablet.toString());
+        return;
+      }
+    }
+    
+    // populate the root tablet with info about the default tablet
+    // the root tablet contains the key extent and locations of all the
+    // metadata tablets
+    String initRootTabFile = rootTablet + "/00000_00000." + FileOperations.getNewFileExtension(AccumuloConfiguration.getDefaultConfiguration());
+    FileSystem ns = fs.getFileSystemByPath(new Path(initRootTabFile));
+    FileSKVWriter mfw = FileOperations.getInstance().openWriter(initRootTabFile, ns, ns.getConf(), AccumuloConfiguration.getDefaultConfiguration());
+    mfw.startDefaultLocalityGroup();
+    
+    Text tableExtent = new Text(KeyExtent.getMetadataEntry(new Text(MetadataTable.ID), MetadataSchema.TabletsSection.getRange().getEndKey().getRow()));
+    
+    // table tablet's directory
+    Key tableDirKey = new Key(tableExtent, TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.getColumnFamily(),
+        TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.getColumnQualifier(), 0);
+    mfw.append(tableDirKey, new Value(TABLE_TABLETS_TABLET_DIR.getBytes()));
+    
+    // table tablet time
+    Key tableTimeKey = new Key(tableExtent, TabletsSection.ServerColumnFamily.TIME_COLUMN.getColumnFamily(),
+        TabletsSection.ServerColumnFamily.TIME_COLUMN.getColumnQualifier(), 0);
+    mfw.append(tableTimeKey, new Value((TabletTime.LOGICAL_TIME_ID + "0").getBytes()));
+    
+    // table tablet's prevrow
+    Key tablePrevRowKey = new Key(tableExtent, TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.getColumnFamily(),
+        TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.getColumnQualifier(), 0);
+    mfw.append(tablePrevRowKey, KeyExtent.encodePrevEndRow(null));
+    
+    // ----------] default tablet info
+    Text defaultExtent = new Text(KeyExtent.getMetadataEntry(new Text(MetadataTable.ID), null));
+    
+    // default's directory
+    Key defaultDirKey = new Key(defaultExtent, TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.getColumnFamily(),
+        TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.getColumnQualifier(), 0);
+    mfw.append(defaultDirKey, new Value(Constants.DEFAULT_TABLET_LOCATION.getBytes()));
+    
+    // default's time
+    Key defaultTimeKey = new Key(defaultExtent, TabletsSection.ServerColumnFamily.TIME_COLUMN.getColumnFamily(),
+        TabletsSection.ServerColumnFamily.TIME_COLUMN.getColumnQualifier(), 0);
+    mfw.append(defaultTimeKey, new Value((TabletTime.LOGICAL_TIME_ID + "0").getBytes()));
+    
+    // default's prevrow
+    Key defaultPrevRowKey = new Key(defaultExtent, TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.getColumnFamily(),
+        TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.getColumnQualifier(), 0);
+    mfw.append(defaultPrevRowKey, KeyExtent.encodePrevEndRow(MetadataSchema.TabletsSection.getRange().getEndKey().getRow()));
+    
+    mfw.close();
+    
+    // create table and default tablets directories
+    for (Path dir : concat(defaultMetadataTabletDirs, tableMetadataTabletDirs)) {
+      try {
+        fstat = fs.getFileStatus(dir);
+        if (!fstat.isDir()) {
+          log.fatal("location " + dir.toString() + " exists but is not a directory");
+          return;
+        }
+      } catch (FileNotFoundException fnfe) {
+        try {
+          fstat = fs.getFileStatus(dir);
+          if (!fstat.isDir()) {
+            log.fatal("location " + dir.toString() + " exists but is not a directory");
+            return;
+          }
+        } catch (FileNotFoundException fnfe2) {
+          // create table info dir
+          if (!fs.mkdirs(dir)) {
+            log.fatal("unable to create directory " + dir.toString());
+            return;
+          }
+        }
+        
+        // create default dir
+        if (!fs.mkdirs(dir)) {
+          log.fatal("unable to create directory " + dir.toString());
+          return;
+        }
+      }
+    }
+  }
+  
+  private static void initZooKeeper(Opts opts, String uuid, String instanceNamePath) throws KeeperException, InterruptedException {
+    // setup basic data in zookeeper
+    IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+    ZooUtil.putPersistentData(zoo.getZooKeeper(), Constants.ZROOT, new byte[0], -1, NodeExistsPolicy.SKIP, Ids.OPEN_ACL_UNSAFE);
+    ZooUtil.putPersistentData(zoo.getZooKeeper(), Constants.ZROOT + Constants.ZINSTANCES, new byte[0], -1, NodeExistsPolicy.SKIP, Ids.OPEN_ACL_UNSAFE);
+    
+    // setup instance name
+    if (opts.clearInstanceName)
+      zoo.recursiveDelete(instanceNamePath, NodeMissingPolicy.SKIP);
+    zoo.putPersistentData(instanceNamePath, uuid.getBytes(), NodeExistsPolicy.FAIL);
+    
+    // setup the instance
+    String zkInstanceRoot = Constants.ZROOT + "/" + uuid;
+    zoo.putPersistentData(zkInstanceRoot, new byte[0], NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + Constants.ZTABLES, Constants.ZTABLES_INITIAL_ID, NodeExistsPolicy.FAIL);
+    TableManager.prepareNewTableState(uuid, RootTable.ID, RootTable.NAME, TableState.ONLINE, NodeExistsPolicy.FAIL);
+    TableManager.prepareNewTableState(uuid, MetadataTable.ID, MetadataTable.NAME, TableState.ONLINE, NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + Constants.ZTSERVERS, new byte[0], NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + Constants.ZPROBLEMS, new byte[0], NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET, new byte[0], NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET_WALOGS, new byte[0], NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + Constants.ZTRACERS, new byte[0], NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTERS, new byte[0], NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTER_LOCK, new byte[0], NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTER_GOAL_STATE, MasterGoalState.NORMAL.toString().getBytes(), NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + Constants.ZGC, new byte[0], NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + Constants.ZGC_LOCK, new byte[0], NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + Constants.ZCONFIG, new byte[0], NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + Constants.ZTABLE_LOCKS, new byte[0], NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + Constants.ZHDFS_RESERVATIONS, new byte[0], NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + Constants.ZNEXT_FILE, new byte[] {'0'}, NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + Constants.ZRECOVERY, new byte[] {'0'}, NodeExistsPolicy.FAIL);
+  }
+  
+  private static String getInstanceNamePath(Opts opts) throws IOException, KeeperException, InterruptedException {
+    // setup the instance name
+    String instanceName, instanceNamePath = null;
+    boolean exists = true;
+    do {
+      if (opts.cliInstanceName == null) {
+        instanceName = getConsoleReader().readLine("Instance name : ");
+      } else {
+        instanceName = opts.cliInstanceName;
+      }
+      if (instanceName == null)
+        System.exit(0);
+      instanceName = instanceName.trim();
+      if (instanceName.length() == 0)
+        continue;
+      instanceNamePath = Constants.ZROOT + Constants.ZINSTANCES + "/" + instanceName;
+      if (opts.clearInstanceName) {
+        exists = false;
+        break;
+      } else if (exists = ZooReaderWriter.getInstance().exists(instanceNamePath)) {
+        String decision = getConsoleReader().readLine("Instance name \"" + instanceName + "\" exists. Delete existing entry from zookeeper? [Y/N] : ");
+        if (decision == null)
+          System.exit(0);
+        if (decision.length() == 1 && decision.toLowerCase(Locale.ENGLISH).charAt(0) == 'y') {
+          opts.clearInstanceName = true;
+          exists = false;
+        }
+      }
+    } while (exists);
+    return instanceNamePath;
+  }
+  
+  private static byte[] getRootPassword(Opts opts) throws IOException {
+    if (opts.cliPassword != null) {
+      return opts.cliPassword.getBytes();
+    }
+    String rootpass;
+    String confirmpass;
+    do {
+      rootpass = getConsoleReader()
+          .readLine("Enter initial password for " + DEFAULT_ROOT_USER + " (this may not be applicable for your security setup): ", '*');
+      if (rootpass == null)
+        System.exit(0);
+      confirmpass = getConsoleReader().readLine("Confirm initial password for " + DEFAULT_ROOT_USER + ": ", '*');
+      if (confirmpass == null)
+        System.exit(0);
+      if (!rootpass.equals(confirmpass))
+        log.error("Passwords do not match");
+    } while (!rootpass.equals(confirmpass));
+    return rootpass.getBytes();
+  }
+  
+  private static void initSecurity(Opts opts, String iid) throws AccumuloSecurityException, ThriftSecurityException {
+    AuditedSecurityOperation.getInstance(iid, true).initializeSecurity(SystemCredentials.get().toThrift(HdfsZooInstance.getInstance()), DEFAULT_ROOT_USER,
+        opts.rootpass);
+  }
+  
+  protected static void initMetadataConfig() throws IOException {
+    try {
+      Configuration conf = CachedConfiguration.getInstance();
+      int max = conf.getInt("dfs.replication.max", 512);
+      // Hadoop 0.23 switched the min value configuration name
+      int min = Math.max(conf.getInt("dfs.replication.min", 1), conf.getInt("dfs.namenode.replication.min", 1));
+      if (max < 5)
+        setMetadataReplication(max, "max");
+      if (min > 5)
+        setMetadataReplication(min, "min");
+      for (Entry<String,String> entry : initialMetadataConf.entrySet()) {
+        if (!TablePropUtil.setTableProperty(RootTable.ID, entry.getKey(), entry.getValue()))
+          throw new IOException("Cannot create per-table property " + entry.getKey());
+        if (!TablePropUtil.setTableProperty(MetadataTable.ID, entry.getKey(), entry.getValue()))
+          throw new IOException("Cannot create per-table property " + entry.getKey());
+      }
+    } catch (Exception e) {
+      log.fatal("error talking to zookeeper", e);
+      throw new IOException(e);
+    }
+  }
+  
+  private static void setMetadataReplication(int replication, String reason) throws IOException {
+    String rep = getConsoleReader().readLine(
+        "Your HDFS replication " + reason + " is not compatible with our default " + MetadataTable.NAME + " replication of 5. What do you want to set your "
+            + MetadataTable.NAME + " replication to? (" + replication + ") ");
+    if (rep == null || rep.length() == 0)
+      rep = Integer.toString(replication);
+    else
+      // Lets make sure it's a number
+      Integer.parseInt(rep);
+    initialMetadataConf.put(Property.TABLE_FILE_REPLICATION.getKey(), rep);
+  }
+  
+  public static boolean isInitialized(VolumeManager fs) throws IOException {
+    return (fs.exists(ServerConstants.getInstanceIdLocation()) || fs.exists(ServerConstants.getDataVersionLocation()));
+  }
+  
+  static class Opts extends Help {
+    @Parameter(names = "--reset-security", description = "just update the security information")
+    boolean resetSecurity = false;
+    @Parameter(names = "--clear-instance-name", description = "delete any existing instance name without prompting")
+    boolean clearInstanceName = false;
+    @Parameter(names = "--instance-name", description = "the instance name, if not provided, will prompt")
+    String cliInstanceName;
+    @Parameter(names = "--password", description = "set the password on the command line")
+    String cliPassword;
+    
+    byte[] rootpass = null;
+  }
+  
+  public static void main(String[] args) {
+    Opts opts = new Opts();
+    opts.parseArgs(Initialize.class.getName(), args);
+    
+    try {
+      SecurityUtil.serverLogin();
+      Configuration conf = CachedConfiguration.getInstance();
+      
+      @SuppressWarnings("deprecation")
+      VolumeManager fs = VolumeManagerImpl.get(SiteConfiguration.getSiteConfiguration());
+      
+      if (opts.resetSecurity) {
+        if (isInitialized(fs)) {
+          opts.rootpass = getRootPassword(opts);
+          initSecurity(opts, HdfsZooInstance.getInstance().getInstanceID());
+        } else {
+          log.fatal("Attempted to reset security on accumulo before it was initialized");
+        }
+      } else if (!doInit(opts, conf, fs))
+        System.exit(-1);
+    } catch (Exception e) {
+      log.fatal(e, e);
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/162bd40d/server/base/src/main/java/org/apache/accumulo/tserver/constraints/MetadataConstraints.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/tserver/constraints/MetadataConstraints.java b/server/base/src/main/java/org/apache/accumulo/tserver/constraints/MetadataConstraints.java
new file mode 100644
index 0000000..f190cee
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/tserver/constraints/MetadataConstraints.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.constraints;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.constraints.Constraint;
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
+import org.apache.accumulo.core.util.ColumnFQ;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.TransactionWatcher.Arbitrator;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
+import org.apache.accumulo.server.zookeeper.ZooCache;
+import org.apache.accumulo.server.zookeeper.ZooLock;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+public class MetadataConstraints implements Constraint {
+  
+  private ZooCache zooCache = null;
+  private String zooRoot = null;
+  
+  private static final Logger log = Logger.getLogger(MetadataConstraints.class);
+  
+  private static boolean[] validTableNameChars = new boolean[256];
+  
+  {
+    for (int i = 0; i < 256; i++) {
+      validTableNameChars[i] = ((i >= 'a' && i <= 'z') || (i >= '0' && i <= '9')) || i == '!';
+    }
+  }
+  
+  private static final HashSet<ColumnFQ> validColumnQuals = new HashSet<ColumnFQ>(Arrays.asList(new ColumnFQ[] {
+      TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN, TabletsSection.TabletColumnFamily.OLD_PREV_ROW_COLUMN,
+      TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN, TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN, TabletsSection.ServerColumnFamily.TIME_COLUMN,
+      TabletsSection.ServerColumnFamily.LOCK_COLUMN, TabletsSection.ServerColumnFamily.FLUSH_COLUMN, TabletsSection.ServerColumnFamily.COMPACT_COLUMN}));
+  
+  private static final HashSet<Text> validColumnFams = new HashSet<Text>(Arrays.asList(new Text[] {TabletsSection.BulkFileColumnFamily.NAME,
+      LogColumnFamily.NAME, ScanFileColumnFamily.NAME, DataFileColumnFamily.NAME,
+      TabletsSection.CurrentLocationColumnFamily.NAME, TabletsSection.LastLocationColumnFamily.NAME, TabletsSection.FutureLocationColumnFamily.NAME,
+      ChoppedColumnFamily.NAME, ClonedColumnFamily.NAME}));
+  
+  private static boolean isValidColumn(ColumnUpdate cu) {
+    
+    if (validColumnFams.contains(new Text(cu.getColumnFamily())))
+      return true;
+    
+    if (validColumnQuals.contains(new ColumnFQ(cu)))
+      return true;
+    
+    return false;
+  }
+  
+  static private ArrayList<Short> addViolation(ArrayList<Short> lst, int violation) {
+    if (lst == null)
+      lst = new ArrayList<Short>();
+    lst.add((short) violation);
+    return lst;
+  }
+  
+  static private ArrayList<Short> addIfNotPresent(ArrayList<Short> lst, int intViolation) {
+    if (lst == null)
+      return addViolation(lst, intViolation);
+    short violation = (short) intViolation;
+    if (!lst.contains(violation))
+      return addViolation(lst, intViolation);
+    return lst;
+  }
+  
+  @Override
+  public List<Short> check(Environment env, Mutation mutation) {
+    
+    ArrayList<Short> violations = null;
+    
+    Collection<ColumnUpdate> colUpdates = mutation.getUpdates();
+    
+    // check the row, it should contains at least one ; or end with <
+    boolean containsSemiC = false;
+    
+    byte[] row = mutation.getRow();
+    
+    // always allow rows that fall within reserved areas
+    if (row.length > 0 && row[0] == '~')
+      return null;
+    if (row.length > 2 && row[0] == '!' && row[1] == '!' && row[2] == '~')
+      return null;
+    
+    for (byte b : row) {
+      if (b == ';') {
+        containsSemiC = true;
+      }
+      
+      if (b == ';' || b == '<')
+        break;
+      
+      if (!validTableNameChars[0xff & b]) {
+        violations = addIfNotPresent(violations, 4);
+      }
+    }
+    
+    if (!containsSemiC) {
+      // see if last row char is <
+      if (row.length == 0 || row[row.length - 1] != '<') {
+        violations = addIfNotPresent(violations, 4);
+      }
+    } else {
+      if (row.length == 0) {
+        violations = addIfNotPresent(violations, 4);
+      }
+    }
+    
+    if (row.length > 0 && row[0] == '!') {
+      if (row.length < 3 || row[1] != '0' || (row[2] != '<' && row[2] != ';')) {
+        violations = addIfNotPresent(violations, 4);
+      }
+    }
+    
+    // ensure row is not less than Constants.METADATA_TABLE_ID
+    if (new Text(row).compareTo(new Text(MetadataTable.ID)) < 0) {
+      violations = addViolation(violations, 5);
+    }
+    
+    boolean checkedBulk = false;
+    
+    for (ColumnUpdate columnUpdate : colUpdates) {
+      Text columnFamily = new Text(columnUpdate.getColumnFamily());
+      
+      if (columnUpdate.isDeleted()) {
+        if (!isValidColumn(columnUpdate)) {
+          violations = addViolation(violations, 2);
+        }
+        continue;
+      }
+      
+      if (columnUpdate.getValue().length == 0 && !columnFamily.equals(ScanFileColumnFamily.NAME)) {
+        violations = addViolation(violations, 6);
+      }
+      
+      if (columnFamily.equals(DataFileColumnFamily.NAME)) {
+        try {
+          DataFileValue dfv = new DataFileValue(columnUpdate.getValue());
+          
+          if (dfv.getSize() < 0 || dfv.getNumEntries() < 0) {
+            violations = addViolation(violations, 1);
+          }
+        } catch (NumberFormatException nfe) {
+          violations = addViolation(violations, 1);
+        } catch (ArrayIndexOutOfBoundsException aiooe) {
+          violations = addViolation(violations, 1);
+        }
+      } else if (columnFamily.equals(ScanFileColumnFamily.NAME)) {
+        
+      } else if (columnFamily.equals(TabletsSection.BulkFileColumnFamily.NAME)) {
+        if (!columnUpdate.isDeleted() && !checkedBulk) {
+          // splits, which also write the time reference, are allowed to write this reference even when
+          // the transaction is not running because the other half of the tablet is holding a reference
+          // to the file.
+          boolean isSplitMutation = false;
+          // When a tablet is assigned, it re-writes the metadata. It should probably only update the location information,
+          // but it writes everything. We allow it to re-write the bulk information if it is setting the location.
+          // See ACCUMULO-1230.
+          boolean isLocationMutation = false;
+          
+          HashSet<Text> dataFiles = new HashSet<Text>();
+          HashSet<Text> loadedFiles = new HashSet<Text>();
+          
+          String tidString = new String(columnUpdate.getValue());
+          int otherTidCount = 0;
+          
+          for (ColumnUpdate update : mutation.getUpdates()) {
+            if (new ColumnFQ(update).equals(TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN)) {
+              isSplitMutation = true;
+            } else if (new Text(update.getColumnFamily()).equals(TabletsSection.CurrentLocationColumnFamily.NAME)) {
+              isLocationMutation = true;
+            } else if (new Text(update.getColumnFamily()).equals(DataFileColumnFamily.NAME)) {
+              dataFiles.add(new Text(update.getColumnQualifier()));
+            } else if (new Text(update.getColumnFamily()).equals(TabletsSection.BulkFileColumnFamily.NAME)) {
+              loadedFiles.add(new Text(update.getColumnQualifier()));
+              
+              if (!new String(update.getValue()).equals(tidString)) {
+                otherTidCount++;
+              }
+            }
+          }
+          
+          if (!isSplitMutation && !isLocationMutation) {
+            long tid = Long.parseLong(tidString);
+            
+            try {
+              if (otherTidCount > 0 || !dataFiles.equals(loadedFiles) || !getArbitrator().transactionAlive(Constants.BULK_ARBITRATOR_TYPE, tid)) {
+                violations = addViolation(violations, 8);
+              }
+            } catch (Exception ex) {
+              violations = addViolation(violations, 8);
+            }
+          }
+          
+          checkedBulk = true;
+        }
+      } else {
+        if (!isValidColumn(columnUpdate)) {
+          violations = addViolation(violations, 2);
+        } else if (new ColumnFQ(columnUpdate).equals(TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN) && columnUpdate.getValue().length > 0
+            && (violations == null || !violations.contains((short) 4))) {
+          KeyExtent ke = new KeyExtent(new Text(mutation.getRow()), (Text) null);
+          
+          Text per = KeyExtent.decodePrevEndRow(new Value(columnUpdate.getValue()));
+          
+          boolean prevEndRowLessThanEndRow = per == null || ke.getEndRow() == null || per.compareTo(ke.getEndRow()) < 0;
+          
+          if (!prevEndRowLessThanEndRow) {
+            violations = addViolation(violations, 3);
+          }
+        } else if (new ColumnFQ(columnUpdate).equals(TabletsSection.ServerColumnFamily.LOCK_COLUMN)) {
+          if (zooCache == null) {
+            zooCache = new ZooCache();
+          }
+          
+          if (zooRoot == null) {
+            zooRoot = ZooUtil.getRoot(HdfsZooInstance.getInstance());
+          }
+          
+          boolean lockHeld = false;
+          String lockId = new String(columnUpdate.getValue());
+          
+          try {
+            lockHeld = ZooLock.isLockHeld(zooCache, new ZooUtil.LockID(zooRoot, lockId));
+          } catch (Exception e) {
+            log.debug("Failed to verify lock was held " + lockId + " " + e.getMessage());
+          }
+          
+          if (!lockHeld) {
+            violations = addViolation(violations, 7);
+          }
+        }
+        
+      }
+    }
+    
+    if (violations != null) {
+      log.debug("violating metadata mutation : " + new String(mutation.getRow()));
+      for (ColumnUpdate update : mutation.getUpdates()) {
+        log.debug(" update: " + new String(update.getColumnFamily()) + ":" + new String(update.getColumnQualifier()) + " value "
+            + (update.isDeleted() ? "[delete]" : new String(update.getValue())));
+      }
+    }
+    
+    return violations;
+  }
+  
+  protected Arbitrator getArbitrator() {
+    return new ZooArbitrator();
+  }
+  
+  @Override
+  public String getViolationDescription(short violationCode) {
+    switch (violationCode) {
+      case 1:
+        return "data file size must be a non-negative integer";
+      case 2:
+        return "Invalid column name given.";
+      case 3:
+        return "Prev end row is greater than or equal to end row.";
+      case 4:
+        return "Invalid metadata row format";
+      case 5:
+        return "Row can not be less than " + MetadataTable.ID;
+      case 6:
+        return "Empty values are not allowed for any " + MetadataTable.NAME + " column";
+      case 7:
+        return "Lock not held in zookeeper by writer";
+      case 8:
+        return "Bulk load transaction no longer running";
+    }
+    return null;
+  }
+  
+  @Override
+  protected void finalize() {
+    if (zooCache != null)
+      zooCache.clear();
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/162bd40d/server/base/src/main/java/org/apache/accumulo/tserver/iterators/MetadataBulkLoadFilter.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/tserver/iterators/MetadataBulkLoadFilter.java b/server/base/src/main/java/org/apache/accumulo/tserver/iterators/MetadataBulkLoadFilter.java
new file mode 100644
index 0000000..8c4c4e2
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/tserver/iterators/MetadataBulkLoadFilter.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.iterators;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.Filter;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.fate.zookeeper.TransactionWatcher.Arbitrator;
+import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
+import org.apache.log4j.Logger;
+
+/**
+ * A special iterator for the metadata table that removes inactive bulk load flags
+ * 
+ */
+public class MetadataBulkLoadFilter extends Filter {
+  private static Logger log = Logger.getLogger(MetadataBulkLoadFilter.class);
+  
+  enum Status {
+    ACTIVE, INACTIVE
+  }
+  
+  Map<Long,Status> bulkTxStatusCache;
+  Arbitrator arbitrator;
+  
+  @Override
+  public boolean accept(Key k, Value v) {
+    if (!k.isDeleted() && k.compareColumnFamily(TabletsSection.BulkFileColumnFamily.NAME) == 0) {
+      long txid = Long.valueOf(v.toString());
+      
+      Status status = bulkTxStatusCache.get(txid);
+      if (status == null) {
+        try {
+          if (arbitrator.transactionComplete(Constants.BULK_ARBITRATOR_TYPE, txid)) {
+            status = Status.INACTIVE;
+          } else {
+            status = Status.ACTIVE;
+          }
+        } catch (Exception e) {
+          status = Status.ACTIVE;
+          log.error(e, e);
+        }
+        
+        bulkTxStatusCache.put(txid, status);
+      }
+      
+      return status == Status.ACTIVE;
+    }
+    
+    return true;
+  }
+  
+  @Override
+  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+    super.init(source, options, env);
+    
+    if (env.getIteratorScope() == IteratorScope.scan) {
+      throw new IOException("This iterator not intended for use at scan time");
+    }
+    
+    bulkTxStatusCache = new HashMap<Long,MetadataBulkLoadFilter.Status>();
+    arbitrator = getArbitrator();
+  }
+  
+  protected Arbitrator getArbitrator() {
+    return new ZooArbitrator();
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/162bd40d/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java b/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
new file mode 100644
index 0000000..fb4a3dc
--- /dev/null
+++ b/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.client;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.impl.TabletLocator;
+import org.apache.accumulo.core.client.impl.TabletLocator.TabletLocation;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class BulkImporterTest {
+  
+  static final SortedSet<KeyExtent> fakeMetaData = new TreeSet<KeyExtent>();
+  static final Text tableId = new Text("1");
+  static {
+    fakeMetaData.add(new KeyExtent(tableId, new Text("a"), null));
+    for (String part : new String[] {"b", "bm", "c", "cm", "d", "dm", "e", "em", "f", "g", "h", "i", "j", "k", "l"}) {
+      fakeMetaData.add(new KeyExtent(tableId, new Text(part), fakeMetaData.last().getEndRow()));
+    }
+    fakeMetaData.add(new KeyExtent(tableId, null, fakeMetaData.last().getEndRow()));
+  }
+  
+  class MockTabletLocator extends TabletLocator {
+    int invalidated = 0;
+    
+    @Override
+    public TabletLocation locateTablet(Credentials credentials, Text row, boolean skipRow, boolean retry) throws AccumuloException, AccumuloSecurityException,
+        TableNotFoundException {
+      return new TabletLocation(fakeMetaData.tailSet(new KeyExtent(tableId, row, null)).first(), "localhost");
+    }
+    
+    @Override
+    public <T extends Mutation> void binMutations(Credentials credentials, List<T> mutations, Map<String,TabletServerMutations<T>> binnedMutations, List<T> failures)
+        throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+      throw new NotImplementedException();
+    }
+    
+    @Override
+    public List<Range> binRanges(Credentials credentials, List<Range> ranges, Map<String,Map<KeyExtent,List<Range>>> binnedRanges) throws AccumuloException,
+        AccumuloSecurityException, TableNotFoundException {
+      throw new NotImplementedException();
+    }
+    
+    @Override
+    public void invalidateCache(KeyExtent failedExtent) {
+      invalidated++;
+    }
+    
+    @Override
+    public void invalidateCache(Collection<KeyExtent> keySet) {
+      throw new NotImplementedException();
+    }
+    
+    @Override
+    public void invalidateCache() {
+      throw new NotImplementedException();
+    }
+    
+    @Override
+    public void invalidateCache(String server) {
+      throw new NotImplementedException();
+    }
+  }
+  
+  @Test
+  public void testFindOverlappingTablets() throws Exception {
+    Credentials credentials = null;
+    MockTabletLocator locator = new MockTabletLocator();
+    FileSystem fs = FileSystem.getLocal(CachedConfiguration.getInstance());
+    AccumuloConfiguration acuConf = AccumuloConfiguration.getDefaultConfiguration();
+    String file = "target/testFile.rf";
+    fs.delete(new Path(file), true);
+    FileSKVWriter writer = FileOperations.getInstance().openWriter(file, fs, fs.getConf(), acuConf);
+    writer.startDefaultLocalityGroup();
+    Value empty = new Value(new byte[] {});
+    writer.append(new Key("a", "cf", "cq"), empty);
+    writer.append(new Key("a", "cf", "cq1"), empty);
+    writer.append(new Key("a", "cf", "cq2"), empty);
+    writer.append(new Key("a", "cf", "cq3"), empty);
+    writer.append(new Key("a", "cf", "cq4"), empty);
+    writer.append(new Key("a", "cf", "cq5"), empty);
+    writer.append(new Key("d", "cf", "cq"), empty);
+    writer.append(new Key("d", "cf", "cq1"), empty);
+    writer.append(new Key("d", "cf", "cq2"), empty);
+    writer.append(new Key("d", "cf", "cq3"), empty);
+    writer.append(new Key("d", "cf", "cq4"), empty);
+    writer.append(new Key("d", "cf", "cq5"), empty);
+    writer.append(new Key("dd", "cf", "cq1"), empty);
+    writer.append(new Key("ichabod", "cf", "cq"), empty);
+    writer.append(new Key("icky", "cf", "cq1"), empty);
+    writer.append(new Key("iffy", "cf", "cq2"), empty);
+    writer.append(new Key("internal", "cf", "cq3"), empty);
+    writer.append(new Key("is", "cf", "cq4"), empty);
+    writer.append(new Key("iterator", "cf", "cq5"), empty);
+    writer.append(new Key("xyzzy", "cf", "cq"), empty);
+    writer.close();
+    List<TabletLocation> overlaps = BulkImporter.findOverlappingTablets(acuConf, fs, locator, new Path(file), credentials);
+    Assert.assertEquals(5, overlaps.size());
+    Collections.sort(overlaps);
+    Assert.assertEquals(new KeyExtent(tableId, new Text("a"), null), overlaps.get(0).tablet_extent);
+    Assert.assertEquals(new KeyExtent(tableId, new Text("d"), new Text("cm")), overlaps.get(1).tablet_extent);
+    Assert.assertEquals(new KeyExtent(tableId, new Text("dm"), new Text("d")), overlaps.get(2).tablet_extent);
+    Assert.assertEquals(new KeyExtent(tableId, new Text("j"), new Text("i")), overlaps.get(3).tablet_extent);
+    Assert.assertEquals(new KeyExtent(tableId, null, new Text("l")), overlaps.get(4).tablet_extent);
+    
+    List<TabletLocation> overlaps2 = BulkImporter.findOverlappingTablets(acuConf, fs, locator, new Path(file), new KeyExtent(tableId, new Text("h"), new Text(
+        "b")), credentials);
+    Assert.assertEquals(3, overlaps2.size());
+    Assert.assertEquals(new KeyExtent(tableId, new Text("d"), new Text("cm")), overlaps2.get(0).tablet_extent);
+    Assert.assertEquals(new KeyExtent(tableId, new Text("dm"), new Text("d")), overlaps2.get(1).tablet_extent);
+    Assert.assertEquals(new KeyExtent(tableId, new Text("j"), new Text("i")), overlaps2.get(2).tablet_extent);
+    Assert.assertEquals(locator.invalidated, 1);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/162bd40d/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java b/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java
new file mode 100644
index 0000000..fbae24c
--- /dev/null
+++ b/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.constraints;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.util.List;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.fate.zookeeper.TransactionWatcher.Arbitrator;
+import org.apache.accumulo.tserver.constraints.MetadataConstraints;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.junit.Test;
+
+public class MetadataConstraintsTest {
+  
+  static class TestMetadataConstraints extends MetadataConstraints {
+    @Override
+    protected Arbitrator getArbitrator() {
+      return new Arbitrator() {
+        
+        @Override
+        public boolean transactionAlive(String type, long tid) throws Exception {
+          if (tid == 9)
+            throw new RuntimeException("txid 9 reserved for future use");
+          return tid == 5 || tid == 7;
+        }
+        
+        @Override
+        public boolean transactionComplete(String type, long tid) throws Exception {
+          return tid != 5 && tid != 7;
+        }
+      };
+    }
+  }
+  
+  @Test
+  public void testCheck() {
+    Logger.getLogger(AccumuloConfiguration.class).setLevel(Level.ERROR);
+    Mutation m = new Mutation(new Text("0;foo"));
+    TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.put(m, new Value("1foo".getBytes()));
+    
+    MetadataConstraints mc = new MetadataConstraints();
+    
+    List<Short> violations = mc.check(null, m);
+    
+    assertNotNull(violations);
+    assertEquals(1, violations.size());
+    assertEquals(Short.valueOf((short) 3), violations.get(0));
+    
+    m = new Mutation(new Text("0:foo"));
+    TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.put(m, new Value("1poo".getBytes()));
+    
+    violations = mc.check(null, m);
+    
+    assertNotNull(violations);
+    assertEquals(1, violations.size());
+    assertEquals(Short.valueOf((short) 4), violations.get(0));
+    
+    m = new Mutation(new Text("0;foo"));
+    m.put(new Text("bad_column_name"), new Text(""), new Value("e".getBytes()));
+    
+    violations = mc.check(null, m);
+    
+    assertNotNull(violations);
+    assertEquals(1, violations.size());
+    assertEquals(Short.valueOf((short) 2), violations.get(0));
+    
+    m = new Mutation(new Text("!!<"));
+    TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.put(m, new Value("1poo".getBytes()));
+    
+    violations = mc.check(null, m);
+    
+    assertNotNull(violations);
+    assertEquals(2, violations.size());
+    assertEquals(Short.valueOf((short) 4), violations.get(0));
+    assertEquals(Short.valueOf((short) 5), violations.get(1));
+    
+    m = new Mutation(new Text("0;foo"));
+    TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.put(m, new Value("".getBytes()));
+    
+    violations = mc.check(null, m);
+    
+    assertNotNull(violations);
+    assertEquals(1, violations.size());
+    assertEquals(Short.valueOf((short) 6), violations.get(0));
+    
+    m = new Mutation(new Text("0;foo"));
+    TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.put(m, new Value("bar".getBytes()));
+    
+    violations = mc.check(null, m);
+    
+    assertEquals(null, violations);
+    
+    m = new Mutation(new Text("!0<"));
+    TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.put(m, new Value("bar".getBytes()));
+    
+    violations = mc.check(null, m);
+    
+    assertEquals(null, violations);
+    
+    m = new Mutation(new Text("!1<"));
+    TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.put(m, new Value("bar".getBytes()));
+    
+    violations = mc.check(null, m);
+    
+    assertNotNull(violations);
+    assertEquals(1, violations.size());
+    assertEquals(Short.valueOf((short) 4), violations.get(0));
+    
+  }
+  
+  @Test
+  public void testBulkFileCheck() {
+    MetadataConstraints mc = new TestMetadataConstraints();
+    Mutation m;
+    List<Short> violations;
+    
+    // inactive txid
+    m = new Mutation(new Text("0;foo"));
+    m.put(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile"), new Value("12345".getBytes()));
+    m.put(DataFileColumnFamily.NAME, new Text("/someFile"), new Value("1,1".getBytes()));
+    violations = mc.check(null, m);
+    assertNotNull(violations);
+    assertEquals(1, violations.size());
+    assertEquals(Short.valueOf((short) 8), violations.get(0));
+    
+    // txid that throws exception
+    m = new Mutation(new Text("0;foo"));
+    m.put(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile"), new Value("9".getBytes()));
+    m.put(DataFileColumnFamily.NAME, new Text("/someFile"), new Value("1,1".getBytes()));
+    violations = mc.check(null, m);
+    assertNotNull(violations);
+    assertEquals(1, violations.size());
+    assertEquals(Short.valueOf((short) 8), violations.get(0));
+    
+    // active txid w/ file
+    m = new Mutation(new Text("0;foo"));
+    m.put(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile"), new Value("5".getBytes()));
+    m.put(DataFileColumnFamily.NAME, new Text("/someFile"), new Value("1,1".getBytes()));
+    violations = mc.check(null, m);
+    assertNull(violations);
+    
+    // active txid w/o file
+    m = new Mutation(new Text("0;foo"));
+    m.put(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile"), new Value("5".getBytes()));
+    violations = mc.check(null, m);
+    assertNotNull(violations);
+    assertEquals(1, violations.size());
+    assertEquals(Short.valueOf((short) 8), violations.get(0));
+    
+    // two active txids w/ files
+    m = new Mutation(new Text("0;foo"));
+    m.put(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile"), new Value("5".getBytes()));
+    m.put(DataFileColumnFamily.NAME, new Text("/someFile"), new Value("1,1".getBytes()));
+    m.put(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile2"), new Value("7".getBytes()));
+    m.put(DataFileColumnFamily.NAME, new Text("/someFile2"), new Value("1,1".getBytes()));
+    violations = mc.check(null, m);
+    assertNotNull(violations);
+    assertEquals(1, violations.size());
+    assertEquals(Short.valueOf((short) 8), violations.get(0));
+    
+    // two files w/ one active txid
+    m = new Mutation(new Text("0;foo"));
+    m.put(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile"), new Value("5".getBytes()));
+    m.put(DataFileColumnFamily.NAME, new Text("/someFile"), new Value("1,1".getBytes()));
+    m.put(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile2"), new Value("5".getBytes()));
+    m.put(DataFileColumnFamily.NAME, new Text("/someFile2"), new Value("1,1".getBytes()));
+    violations = mc.check(null, m);
+    assertNull(violations);
+    
+    // two loaded w/ one active txid and one file
+    m = new Mutation(new Text("0;foo"));
+    m.put(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile"), new Value("5".getBytes()));
+    m.put(DataFileColumnFamily.NAME, new Text("/someFile"), new Value("1,1".getBytes()));
+    m.put(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile2"), new Value("5".getBytes()));
+    violations = mc.check(null, m);
+    assertNotNull(violations);
+    assertEquals(1, violations.size());
+    assertEquals(Short.valueOf((short) 8), violations.get(0));
+    
+    // active txid, mutation that looks like split
+    m = new Mutation(new Text("0;foo"));
+    m.put(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile"), new Value("5".getBytes()));
+    TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(m, new Value("/t1".getBytes()));
+    violations = mc.check(null, m);
+    assertNull(violations);
+    
+    // inactive txid, mutation that looks like split
+    m = new Mutation(new Text("0;foo"));
+    m.put(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile"), new Value("12345".getBytes()));
+    TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(m, new Value("/t1".getBytes()));
+    violations = mc.check(null, m);
+    assertNull(violations);
+    
+    // active txid, mutation that looks like a load
+    m = new Mutation(new Text("0;foo"));
+    m.put(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile"), new Value("5".getBytes()));
+    m.put(TabletsSection.CurrentLocationColumnFamily.NAME, new Text("789"), new Value("127.0.0.1:9997".getBytes()));
+    violations = mc.check(null, m);
+    assertNull(violations);
+    
+    // inactive txid, mutation that looks like a load
+    m = new Mutation(new Text("0;foo"));
+    m.put(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile"), new Value("12345".getBytes()));
+    m.put(TabletsSection.CurrentLocationColumnFamily.NAME, new Text("789"), new Value("127.0.0.1:9997".getBytes()));
+    violations = mc.check(null, m);
+    assertNull(violations);
+    
+    // deleting a load flag
+    m = new Mutation(new Text("0;foo"));
+    m.putDelete(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile"));
+    violations = mc.check(null, m);
+    assertNull(violations);
+    
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/162bd40d/server/base/src/test/java/org/apache/accumulo/server/data/ServerMutationTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/data/ServerMutationTest.java b/server/base/src/test/java/org/apache/accumulo/server/data/ServerMutationTest.java
new file mode 100644
index 0000000..0df27f1
--- /dev/null
+++ b/server/base/src/test/java/org/apache/accumulo/server/data/ServerMutationTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.data;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.junit.Test;
+
+public class ServerMutationTest {
+  
+  @Test
+  public void test() throws Exception {
+    ServerMutation m = new ServerMutation(new Text("r1"));
+    m.put(new Text("cf1"), new Text("cq1"), new Value("v1".getBytes()));
+    m.put(new Text("cf2"), new Text("cq2"), 56, new Value("v2".getBytes()));
+    m.setSystemTimestamp(42);
+    
+    List<ColumnUpdate> updates = m.getUpdates();
+    
+    assertEquals(2, updates.size());
+    
+    assertEquals("r1", new String(m.getRow()));
+    ColumnUpdate cu = updates.get(0);
+    
+    assertEquals("cf1", new String(cu.getColumnFamily()));
+    assertEquals("cq1", new String(cu.getColumnQualifier()));
+    assertEquals("", new String(cu.getColumnVisibility()));
+    assertFalse(cu.hasTimestamp());
+    assertEquals(42l, cu.getTimestamp());
+    
+    ServerMutation m2 = new ServerMutation();
+    ReflectionUtils.copy(CachedConfiguration.getInstance(), m, m2);
+    
+    updates = m2.getUpdates();
+    
+    assertEquals(2, updates.size());
+    assertEquals("r1", new String(m2.getRow()));
+    
+    cu = updates.get(0);
+    assertEquals("cf1", new String(cu.getColumnFamily()));
+    assertEquals("cq1", new String(cu.getColumnQualifier()));
+    assertFalse(cu.hasTimestamp());
+    assertEquals(42l, cu.getTimestamp());
+    
+    cu = updates.get(1);
+    
+    assertEquals("r1", new String(m2.getRow()));
+    assertEquals("cf2", new String(cu.getColumnFamily()));
+    assertEquals("cq2", new String(cu.getColumnQualifier()));
+    assertTrue(cu.hasTimestamp());
+    assertEquals(56, cu.getTimestamp());
+    
+    
+  }
+  
+}


Mime
View raw message