accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [31/48] Merge branch '1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT
Date Tue, 04 Feb 2014 17:55:00 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
index a9cdb15,0000000..63bd894
mode 100644,000000..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
@@@ -1,399 -1,0 +1,399 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.master;
 +
 +import static org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy.SKIP;
 +
 +import java.nio.ByteBuffer;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.master.thrift.TabletServerStatus;
 +import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
 +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 +import org.apache.accumulo.core.util.AddressUtil;
 +import org.apache.accumulo.core.util.ServerServices;
 +import org.apache.accumulo.core.util.ThriftUtil;
 +import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.server.master.state.TServerInstance;
 +import org.apache.accumulo.server.security.SystemCredentials;
 +import org.apache.accumulo.server.util.Halt;
 +import org.apache.accumulo.server.util.time.SimpleTimer;
 +import org.apache.accumulo.server.zookeeper.ZooCache;
 +import org.apache.accumulo.server.zookeeper.ZooLock;
 +import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 +import org.apache.accumulo.trace.instrument.Tracer;
 +import org.apache.hadoop.io.Text;
 +import org.apache.log4j.Logger;
 +import org.apache.thrift.TException;
 +import org.apache.thrift.transport.TTransport;
 +import org.apache.zookeeper.KeeperException;
 +import org.apache.zookeeper.KeeperException.NoNodeException;
 +import org.apache.zookeeper.KeeperException.NotEmptyException;
 +import org.apache.zookeeper.WatchedEvent;
 +import org.apache.zookeeper.Watcher;
 +import org.apache.zookeeper.data.Stat;
 +
 +import com.google.common.net.HostAndPort;
 +
 +public class LiveTServerSet implements Watcher {
 +
 +  public interface Listener {
 +    void update(LiveTServerSet current, Set<TServerInstance> deleted, Set<TServerInstance> added);
 +  }
 +
 +  private static final Logger log = Logger.getLogger(LiveTServerSet.class);
 +
 +  private final Listener cback;
 +  private final Instance instance;
 +  private final AccumuloConfiguration conf;
 +  private ZooCache zooCache;
 +
 +  public class TServerConnection {
 +    private final HostAndPort address;
 +
 +    public TServerConnection(HostAndPort addr) throws TException {
 +      address = addr;
 +    }
 +
 +    private String lockString(ZooLock mlock) {
 +      return mlock.getLockID().serialize(ZooUtil.getRoot(instance) + Constants.ZMASTER_LOCK);
 +    }
 +
 +    public void assignTablet(ZooLock lock, KeyExtent extent) throws TException {
 +      TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
 +      try {
 +        client.loadTablet(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), extent.toThrift());
 +      } finally {
 +        ThriftUtil.returnClient(client);
 +      }
 +    }
 +
 +    public void unloadTablet(ZooLock lock, KeyExtent extent, boolean save) throws TException {
 +      TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
 +      try {
 +        client.unloadTablet(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), extent.toThrift(), save);
 +      } finally {
 +        ThriftUtil.returnClient(client);
 +      }
 +    }
 +
 +    public TabletServerStatus getTableMap(boolean usePooledConnection) throws TException, ThriftSecurityException {
 +
 +      if (usePooledConnection == true)
 +        throw new UnsupportedOperationException();
 +
 +      TTransport transport = ThriftUtil.createTransport(address, conf);
 +
 +      try {
 +        TabletClientService.Client client = ThriftUtil.createClient(new TabletClientService.Client.Factory(), transport);
 +        return client.getTabletServerStatus(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance));
 +      } finally {
 +        if (transport != null)
 +          transport.close();
 +      }
 +    }
 +
 +    public void halt(ZooLock lock) throws TException, ThriftSecurityException {
 +      TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
 +      try {
 +        client.halt(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock));
 +      } finally {
 +        ThriftUtil.returnClient(client);
 +      }
 +    }
 +
 +    public void fastHalt(ZooLock lock) throws TException {
 +      TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
 +      try {
 +        client.fastHalt(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock));
 +      } finally {
 +        ThriftUtil.returnClient(client);
 +      }
 +    }
 +
 +    public void flush(ZooLock lock, String tableId, byte[] startRow, byte[] endRow) throws TException {
 +      TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
 +      try {
 +        client.flush(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), tableId,
 +            startRow == null ? null : ByteBuffer.wrap(startRow), endRow == null ? null : ByteBuffer.wrap(endRow));
 +      } finally {
 +        ThriftUtil.returnClient(client);
 +      }
 +    }
 +
 +    public void chop(ZooLock lock, KeyExtent extent) throws TException {
 +      TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
 +      try {
 +        client.chop(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), extent.toThrift());
 +      } finally {
 +        ThriftUtil.returnClient(client);
 +      }
 +    }
 +
 +    public void splitTablet(ZooLock lock, KeyExtent extent, Text splitPoint) throws TException, ThriftSecurityException, NotServingTabletException {
 +      TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
 +      try {
 +        client.splitTablet(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), extent.toThrift(),
 +            ByteBuffer.wrap(splitPoint.getBytes(), 0, splitPoint.getLength()));
 +      } finally {
 +        ThriftUtil.returnClient(client);
 +      }
 +    }
 +
 +    public void flushTablet(ZooLock lock, KeyExtent extent) throws TException {
 +      TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
 +      try {
 +        client.flushTablet(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), extent.toThrift());
 +      } finally {
 +        ThriftUtil.returnClient(client);
 +      }
 +    }
 +
 +    public void compact(ZooLock lock, String tableId, byte[] startRow, byte[] endRow) throws TException {
 +      TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
 +      try {
 +        client.compact(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), tableId,
 +            startRow == null ? null : ByteBuffer.wrap(startRow), endRow == null ? null : ByteBuffer.wrap(endRow));
 +      } finally {
 +        ThriftUtil.returnClient(client);
 +      }
 +    }
 +
 +    public boolean isActive(long tid) throws TException {
 +      TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
 +      try {
 +        return client.isActive(Tracer.traceInfo(), tid);
 +      } finally {
 +        ThriftUtil.returnClient(client);
 +      }
 +    }
 +
 +  }
 +
 +  static class TServerInfo {
 +    TServerConnection connection;
 +    TServerInstance instance;
 +
 +    TServerInfo(TServerInstance instance, TServerConnection connection) {
 +      this.connection = connection;
 +      this.instance = instance;
 +    }
 +  };
 +
 +  // The set of active tservers with locks, indexed by their name in zookeeper
 +  private Map<String,TServerInfo> current = new HashMap<String,TServerInfo>();
 +  // as above, indexed by TServerInstance
 +  private Map<TServerInstance,TServerInfo> currentInstances = new HashMap<TServerInstance,TServerInfo>();
 +
 +  // The set of entries in zookeeper without locks, and the first time each was noticed
 +  private Map<String,Long> locklessServers = new HashMap<String,Long>();
 +
 +  public LiveTServerSet(Instance instance, AccumuloConfiguration conf, Listener cback) {
 +    this.cback = cback;
 +    this.instance = instance;
 +    this.conf = conf;
 +
 +  }
 +
 +  public synchronized ZooCache getZooCache() {
 +    if (zooCache == null)
 +      zooCache = new ZooCache(this);
 +    return zooCache;
 +  }
 +
 +  public synchronized void startListeningForTabletServerChanges() {
 +    scanServers();
 +    SimpleTimer.getInstance().schedule(new Runnable() {
 +      @Override
 +      public void run() {
 +        scanServers();
 +      }
 +    }, 0, 5000);
 +  }
 +
 +  public synchronized void scanServers() {
 +    try {
 +      final Set<TServerInstance> updates = new HashSet<TServerInstance>();
 +      final Set<TServerInstance> doomed = new HashSet<TServerInstance>();
 +
 +      final String path = ZooUtil.getRoot(instance) + Constants.ZTSERVERS;
 +
 +      HashSet<String> all = new HashSet<String>(current.keySet());
 +      all.addAll(getZooCache().getChildren(path));
 +
 +      locklessServers.keySet().retainAll(all);
 +
 +      for (String zPath : all) {
 +        checkServer(updates, doomed, path, zPath);
 +      }
 +
 +      // log.debug("Current: " + current.keySet());
 +      if (!doomed.isEmpty() || !updates.isEmpty())
 +        this.cback.update(this, doomed, updates);
 +    } catch (Exception ex) {
 +      log.error(ex, ex);
 +    }
 +  }
 +
 +  private void deleteServerNode(String serverNode) throws InterruptedException, KeeperException {
 +    try {
 +      ZooReaderWriter.getInstance().delete(serverNode, -1);
 +    } catch (NotEmptyException ex) {
 +      // race condition: tserver created the lock after our last check; we'll see it at the next check
 +    } catch (NoNodeException nne) {
 +      // someone else deleted it
 +    }
 +  }
 +
 +  private synchronized void checkServer(final Set<TServerInstance> updates, final Set<TServerInstance> doomed, final String path, final String zPath)
 +      throws TException, InterruptedException, KeeperException {
 +
 +    TServerInfo info = current.get(zPath);
 +
 +    final String lockPath = path + "/" + zPath;
 +    Stat stat = new Stat();
 +    byte[] lockData = ZooLock.getLockData(getZooCache(), lockPath, stat);
 +
 +    if (lockData == null) {
 +      if (info != null) {
 +        doomed.add(info.instance);
 +        current.remove(zPath);
 +        currentInstances.remove(info.instance);
 +      }
 +
 +      Long firstSeen = locklessServers.get(zPath);
 +      if (firstSeen == null) {
 +        locklessServers.put(zPath, System.currentTimeMillis());
 +      } else if (System.currentTimeMillis() - firstSeen > 10 * 60 * 1000) {
 +        deleteServerNode(path + "/" + zPath);
 +        locklessServers.remove(zPath);
 +      }
 +    } else {
 +      locklessServers.remove(zPath);
-       ServerServices services = new ServerServices(new String(lockData));
++      ServerServices services = new ServerServices(new String(lockData, Constants.UTF8));
 +      HostAndPort client = services.getAddress(ServerServices.Service.TSERV_CLIENT);
 +      TServerInstance instance = new TServerInstance(client, stat.getEphemeralOwner());
 +
 +      if (info == null) {
 +        updates.add(instance);
 +        TServerInfo tServerInfo = new TServerInfo(instance, new TServerConnection(client));
 +        current.put(zPath, tServerInfo);
 +        currentInstances.put(instance, tServerInfo);
 +      } else if (!info.instance.equals(instance)) {
 +        doomed.add(info.instance);
 +        updates.add(instance);
 +        TServerInfo tServerInfo = new TServerInfo(instance, new TServerConnection(client));
 +        current.put(zPath, tServerInfo);
 +        currentInstances.put(info.instance, tServerInfo);
 +      }
 +    }
 +  }
 +
 +  @Override
 +  public void process(WatchedEvent event) {
 +
 +    // its important that these event are propagated by ZooCache, because this ensures when reading zoocache that is has already processed the event and cleared
 +    // relevant nodes before code below reads from zoocache
 +
 +    if (event.getPath() != null) {
 +      if (event.getPath().endsWith(Constants.ZTSERVERS)) {
 +        scanServers();
 +      } else if (event.getPath().contains(Constants.ZTSERVERS)) {
 +        int pos = event.getPath().lastIndexOf('/');
 +
 +        // do only if ZTSERVER is parent
 +        if (pos >= 0 && event.getPath().substring(0, pos).endsWith(Constants.ZTSERVERS)) {
 +
 +          String server = event.getPath().substring(pos + 1);
 +
 +          final Set<TServerInstance> updates = new HashSet<TServerInstance>();
 +          final Set<TServerInstance> doomed = new HashSet<TServerInstance>();
 +
 +          final String path = ZooUtil.getRoot(instance) + Constants.ZTSERVERS;
 +
 +          try {
 +            checkServer(updates, doomed, path, server);
 +            if (!doomed.isEmpty() || !updates.isEmpty())
 +              this.cback.update(this, doomed, updates);
 +          } catch (Exception ex) {
 +            log.error(ex, ex);
 +          }
 +        }
 +      }
 +    }
 +  }
 +
 +  public synchronized TServerConnection getConnection(TServerInstance server) {
 +    if (server == null)
 +      return null;
 +    TServerInfo tServerInfo = currentInstances.get(server);
 +    if (tServerInfo == null)
 +      return null;
 +    return tServerInfo.connection;
 +  }
 +
 +  public synchronized Set<TServerInstance> getCurrentServers() {
 +    return new HashSet<TServerInstance>(currentInstances.keySet());
 +  }
 +
 +  public synchronized int size() {
 +    return current.size();
 +  }
 +
 +  public synchronized TServerInstance find(String tabletServer) {
 +    HostAndPort addr = AddressUtil.parseAddress(tabletServer, false);
 +    for (Entry<String,TServerInfo> entry : current.entrySet()) {
 +      if (entry.getValue().instance.getLocation().equals(addr))
 +        return entry.getValue().instance;
 +    }
 +    return null;
 +  }
 +
 +  public synchronized void remove(TServerInstance server) {
 +    String zPath = null;
 +    for (Entry<String,TServerInfo> entry : current.entrySet()) {
 +      if (entry.getValue().instance.equals(server)) {
 +        zPath = entry.getKey();
 +        break;
 +      }
 +    }
 +    if (zPath == null)
 +      return;
 +    current.remove(zPath);
 +    currentInstances.remove(server);
 +
 +    log.info("Removing zookeeper lock for " + server);
 +    String fullpath = ZooUtil.getRoot(instance) + Constants.ZTSERVERS + "/" + zPath;
 +    try {
 +      ZooReaderWriter.getRetryingInstance().recursiveDelete(fullpath, SKIP);
 +    } catch (Exception e) {
 +      String msg = "error removing tablet server lock";
 +      log.fatal(msg, e);
 +      Halt.halt(msg, -1);
 +    }
 +    getZooCache().clear(fullpath);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/base/src/main/java/org/apache/accumulo/server/master/state/DeadServerList.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/master/state/DeadServerList.java
index e1ffd2f,0000000..2f657c4
mode 100644,000000..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/DeadServerList.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/DeadServerList.java
@@@ -1,89 -1,0 +1,90 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.master.state;
 +
 +import java.util.ArrayList;
 +import java.util.List;
 +
++import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.master.thrift.DeadServer;
 +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
 +import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 +import org.apache.log4j.Logger;
 +import org.apache.zookeeper.data.Stat;
 +import org.apache.zookeeper.KeeperException.NoNodeException;
 +
 +public class DeadServerList {
 +  private static final Logger log = Logger.getLogger(DeadServerList.class);
 +  private final String path;
 +  
 +  public DeadServerList(String path) {
 +    this.path = path;
 +    IZooReaderWriter zoo = ZooReaderWriter.getInstance();
 +    try {
 +      zoo.mkdirs(path);
 +    } catch (Exception ex) {
 +      log.error("Unable to make parent directories of " + path, ex);
 +    }
 +  }
 +  
 +  public List<DeadServer> getList() {
 +    List<DeadServer> result = new ArrayList<DeadServer>();
 +    IZooReaderWriter zoo = ZooReaderWriter.getInstance();
 +    try {
 +      List<String> children = zoo.getChildren(path);
 +      if (children != null) {
 +        for (String child : children) {
 +          Stat stat = new Stat();
 +          byte[] data;
 +          try {
 +            data = zoo.getData(path + "/" + child, stat);
 +          } catch (NoNodeException nne) {
 +            // Another thread or process can delete child while this loop is running.
 +            // We ignore this error since it's harmless if we miss the deleted server
 +            // in the dead server list.
 +            continue;
 +          }
-           DeadServer server = new DeadServer(child, stat.getMtime(), new String(data));
++          DeadServer server = new DeadServer(child, stat.getMtime(), new String(data, Constants.UTF8));
 +          result.add(server);
 +        }
 +      }
 +    } catch (Exception ex) {
 +      log.error(ex, ex);
 +    }
 +    return result;
 +  }
 +  
 +  public void delete(String server) {
 +    IZooReaderWriter zoo = ZooReaderWriter.getInstance();
 +    try {
 +      zoo.recursiveDelete(path + "/" + server, NodeMissingPolicy.SKIP);
 +    } catch (Exception ex) {
 +      log.error(ex, ex);
 +    }
 +  }
 +  
 +  public void post(String server, String cause) {
 +    IZooReaderWriter zoo = ZooReaderWriter.getInstance();
 +    try {
-       zoo.putPersistentData(path + "/" + server, cause.getBytes(), NodeExistsPolicy.SKIP);
++      zoo.putPersistentData(path + "/" + server, cause.getBytes(Constants.UTF8), NodeExistsPolicy.SKIP);
 +    } catch (Exception ex) {
 +      log.error(ex, ex);
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
index 90a9d19,0000000..f0ac7bc
mode 100644,000000..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
@@@ -1,191 -1,0 +1,191 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.master.state;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map.Entry;
 +import java.util.SortedMap;
 +
 +import org.apache.accumulo.core.client.BatchScanner;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.ScannerBase;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.iterators.user.WholeRowIterator;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.security.Credentials;
 +import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException;
 +import org.apache.hadoop.io.Text;
 +import org.apache.log4j.Logger;
 +
 +public class MetaDataTableScanner implements Iterator<TabletLocationState> {
 +  private static final Logger log = Logger.getLogger(MetaDataTableScanner.class);
 +  
 +  BatchScanner mdScanner = null;
 +  Iterator<Entry<Key,Value>> iter = null;
 +  
 +  public MetaDataTableScanner(Instance instance, Credentials credentials, Range range, CurrentState state) {
 +    this(instance, credentials, range, state, MetadataTable.NAME);
 +  }
 +  
 +  MetaDataTableScanner(Instance instance, Credentials credentials, Range range, CurrentState state, String tableName) {
 +    // scan over metadata table, looking for tablets in the wrong state based on the live servers and online tables
 +    try {
 +      Connector connector = instance.getConnector(credentials.getPrincipal(), credentials.getToken());
 +      mdScanner = connector.createBatchScanner(tableName, Authorizations.EMPTY, 8);
 +      configureScanner(mdScanner, state);
 +      mdScanner.setRanges(Collections.singletonList(range));
 +      iter = mdScanner.iterator();
 +    } catch (Exception ex) {
 +      if (mdScanner != null)
 +        mdScanner.close();
 +      iter = null;
 +      mdScanner = null;
 +      throw new RuntimeException(ex);
 +    }
 +  }
 +  
 +  static public void configureScanner(ScannerBase scanner, CurrentState state) {
 +    TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
 +    scanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME);
 +    scanner.fetchColumnFamily(TabletsSection.FutureLocationColumnFamily.NAME);
 +    scanner.fetchColumnFamily(TabletsSection.LastLocationColumnFamily.NAME);
 +    scanner.fetchColumnFamily(LogColumnFamily.NAME);
 +    scanner.fetchColumnFamily(ChoppedColumnFamily.NAME);
 +    scanner.addScanIterator(new IteratorSetting(1000, "wholeRows", WholeRowIterator.class));
 +    IteratorSetting tabletChange = new IteratorSetting(1001, "tabletChange", TabletStateChangeIterator.class);
 +    if (state != null) {
 +      TabletStateChangeIterator.setCurrentServers(tabletChange, state.onlineTabletServers());
 +      TabletStateChangeIterator.setOnlineTables(tabletChange, state.onlineTables());
 +      TabletStateChangeIterator.setMerges(tabletChange, state.merges());
 +    }
 +    scanner.addScanIterator(tabletChange);
 +  }
 +  
 +  public MetaDataTableScanner(Instance instance, Credentials credentials, Range range) {
 +    this(instance, credentials, range, MetadataTable.NAME);
 +  }
 +  
 +  public MetaDataTableScanner(Instance instance, Credentials credentials, Range range, String tableName) {
 +    this(instance, credentials, range, null, tableName);
 +  }
 +  
 +  public void close() {
 +    if (iter != null) {
 +      mdScanner.close();
 +      iter = null;
 +    }
 +  }
 +  
 +  @Override
-   public void finalize() {
++  protected void finalize() {
 +    close();
 +  }
 +  
 +  @Override
 +  public boolean hasNext() {
 +    if (iter == null)
 +      return false;
 +    boolean result = iter.hasNext();
 +    if (!result) {
 +      close();
 +    }
 +    return result;
 +  }
 +  
 +  @Override
 +  public TabletLocationState next() {
 +      return fetch();
 +  }
 +  
 +  public static TabletLocationState createTabletLocationState(Key k, Value v) throws IOException, BadLocationStateException {
 +    final SortedMap<Key,Value> decodedRow = WholeRowIterator.decodeRow(k, v);
 +    KeyExtent extent = null;
 +    TServerInstance future = null;
 +    TServerInstance current = null;
 +    TServerInstance last = null;
 +    long lastTimestamp = 0;
 +    List<Collection<String>> walogs = new ArrayList<Collection<String>>();
 +    boolean chopped = false;
 +    
 +    for (Entry<Key,Value> entry : decodedRow.entrySet()) {
 +      Key key = entry.getKey();
 +      Text row = key.getRow();
 +      Text cf = key.getColumnFamily();
 +      Text cq = key.getColumnQualifier();
 +      
 +      if (cf.compareTo(TabletsSection.FutureLocationColumnFamily.NAME) == 0) {
 +        TServerInstance location = new TServerInstance(entry.getValue(), cq);
 +        if (future != null) {
 +          throw new BadLocationStateException("found two assignments for the same extent " + key.getRow() + ": " + future + " and " + location, entry.getKey().getRow());
 +        }
 +        future = location;
 +      } else if (cf.compareTo(TabletsSection.CurrentLocationColumnFamily.NAME) == 0) {
 +        TServerInstance location = new TServerInstance(entry.getValue(), cq);
 +        if (current != null) {
 +          throw new BadLocationStateException("found two locations for the same extent " + key.getRow() + ": " + current + " and " + location, entry.getKey().getRow());
 +        }
 +        current = location;
 +      } else if (cf.compareTo(LogColumnFamily.NAME) == 0) {
 +        String[] split = entry.getValue().toString().split("\\|")[0].split(";");
 +        walogs.add(Arrays.asList(split));
 +      } else if (cf.compareTo(TabletsSection.LastLocationColumnFamily.NAME) == 0) {
 +        if (lastTimestamp < entry.getKey().getTimestamp())
 +          last = new TServerInstance(entry.getValue(), cq);
 +      } else if (cf.compareTo(ChoppedColumnFamily.NAME) == 0) {
 +        chopped = true;
 +      } else if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.equals(cf, cq)) {
 +        extent = new KeyExtent(row, entry.getValue());
 +      }
 +    }
 +    if (extent == null) {
 +      log.warn("No prev-row for key extent: " + decodedRow);
 +      return null;
 +    }
 +    return new TabletLocationState(extent, future, current, last, walogs, chopped);
 +  }
 +  
 +  private TabletLocationState fetch() {
 +    try {
 +      Entry<Key,Value> e = iter.next();
 +      return createTabletLocationState(e.getKey(), e.getValue());
 +    } catch (IOException ex) {
 +      throw new RuntimeException(ex);
 +    } catch (BadLocationStateException ex) {
 +      throw new RuntimeException(ex);
 +    }
 +  }
 +  
 +  @Override
 +  public void remove() {
 +    throw new RuntimeException("Unimplemented");
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java
index e7dce67,0000000..400d0ac
mode 100644,000000..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java
@@@ -1,142 -1,0 +1,143 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.master.state;
 +
 +import java.io.IOException;
 +import java.io.ObjectInputStream;
 +import java.io.ObjectOutputStream;
 +import java.io.Serializable;
 +
++import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 +import org.apache.accumulo.core.util.AddressUtil;
 +import org.apache.hadoop.io.Text;
 +
 +import com.google.common.net.HostAndPort;
 +
 +/**
 + * A tablet is assigned to a tablet server at the given address as long as it is alive and well. When the tablet server is restarted, the instance information
 + * it advertises will change. Therefore tablet assignments can be considered out-of-date if the tablet server instance information has been changed.
 + * 
 + */
 +public class TServerInstance implements Comparable<TServerInstance>, Serializable {
 +  
 +  private static final long serialVersionUID = 1L;
 +  
 +  // HostAndPort is not Serializable
 +  private transient HostAndPort location;
 +  private String session;
 +  private String cachedStringRepresentation;
 +  
 +  public TServerInstance(HostAndPort address, String session) {
 +    this.location = address;
 +    this.session = session;
 +    this.cachedStringRepresentation = hostPort() + "[" + session + "]";
 +  }
 +  
 +  public TServerInstance(HostAndPort address, long session) {
 +    this(address, Long.toHexString(session));
 +  }
 +  
 +  public TServerInstance(String address, long session) {
 +    this(AddressUtil.parseAddress(address, false), Long.toHexString(session));
 +  }
 +  
 +  public TServerInstance(Value address, Text session) {
-     this(AddressUtil.parseAddress(new String(address.get()), false), session.toString());
++    this(AddressUtil.parseAddress(new String(address.get(), Constants.UTF8), false), session.toString());
 +  }
 +  
 +  public void putLocation(Mutation m) {
 +    m.put(TabletsSection.CurrentLocationColumnFamily.NAME, asColumnQualifier(), asMutationValue());
 +  }
 +  
 +  public void putFutureLocation(Mutation m) {
 +    m.put(TabletsSection.FutureLocationColumnFamily.NAME, asColumnQualifier(), asMutationValue());
 +  }
 +  
 +  public void putLastLocation(Mutation m) {
 +    m.put(TabletsSection.LastLocationColumnFamily.NAME, asColumnQualifier(), asMutationValue());
 +  }
 +  
 +  public void clearLastLocation(Mutation m) {
 +    m.putDelete(TabletsSection.LastLocationColumnFamily.NAME, asColumnQualifier());
 +  }
 +  
 +  @Override
 +  public int compareTo(TServerInstance other) {
 +    if (this == other)
 +      return 0;
 +    return this.toString().compareTo(other.toString());
 +  }
 +  
 +  @Override
 +  public int hashCode() {
 +    return toString().hashCode();
 +  }
 +  
 +  @Override
 +  public boolean equals(Object obj) {
 +    if (obj instanceof TServerInstance) {
 +      return compareTo((TServerInstance) obj) == 0;
 +    }
 +    return false;
 +  }
 +  
 +  @Override
 +  public String toString() {
 +    return cachedStringRepresentation;
 +  }
 +  
 +  public int port() {
 +    return getLocation().getPort();
 +  }
 +  
 +  public String host() {
 +    return getLocation().getHostText();
 +  }
 +  
 +  public String hostPort() {
 +    return getLocation().toString();
 +  }
 +  
 +  public Text asColumnQualifier() {
 +    return new Text(this.getSession());
 +  }
 +  
 +  public Value asMutationValue() {
-     return new Value(getLocation().toString().getBytes());
++    return new Value(getLocation().toString().getBytes(Constants.UTF8));
 +  }
 +  
 +  public HostAndPort getLocation() {
 +    return location;
 +  }
 +  
 +  public String getSession() {
 +    return session;
 +  }
 +  
 +  private void writeObject(ObjectOutputStream out) throws IOException {
 +    out.defaultWriteObject();
 +    out.writeObject(location.toString());
 +  }
 +  
 +  private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
 +    in.defaultReadObject();
 +    location = HostAndPort.fromString(in.readObject().toString());
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java
index d1cdd9d,0000000..5749523
mode 100644,000000..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java
@@@ -1,188 -1,0 +1,189 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.master.state;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collection;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +
++import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.iterators.IteratorEnvironment;
 +import org.apache.accumulo.core.iterators.SkippingIterator;
 +import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 +import org.apache.accumulo.core.util.AddressUtil;
 +import org.apache.accumulo.core.util.StringUtil;
 +import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException;
 +import org.apache.commons.codec.binary.Base64;
 +import org.apache.hadoop.io.DataInputBuffer;
 +import org.apache.hadoop.io.DataOutputBuffer;
 +import org.apache.hadoop.io.Text;
 +
 +public class TabletStateChangeIterator extends SkippingIterator {
 +  
 +  private static final String SERVERS_OPTION = "servers";
 +  private static final String TABLES_OPTION = "tables";
 +  private static final String MERGES_OPTION = "merges";
 +  // private static final Logger log = Logger.getLogger(TabletStateChangeIterator.class);
 +  
 +  Set<TServerInstance> current;
 +  Set<String> onlineTables;
 +  Map<Text,MergeInfo> merges;
 +  
 +  @Override
 +  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
 +    super.init(source, options, env);
 +    current = parseServers(options.get(SERVERS_OPTION));
 +    onlineTables = parseTables(options.get(TABLES_OPTION));
 +    merges = parseMerges(options.get(MERGES_OPTION));
 +  }
 +  
 +  private Set<String> parseTables(String tables) {
 +    if (tables == null)
 +      return null;
 +    Set<String> result = new HashSet<String>();
 +    for (String table : tables.split(","))
 +      result.add(table);
 +    return result;
 +  }
 +  
 +  private Set<TServerInstance> parseServers(String servers) {
 +    if (servers == null)
 +      return null;
 +    // parse "host:port[INSTANCE]"
 +    Set<TServerInstance> result = new HashSet<TServerInstance>();
 +    if (servers.length() > 0) {
 +      for (String part : servers.split(",")) {
 +        String parts[] = part.split("\\[", 2);
 +        String hostport = parts[0];
 +        String instance = parts[1];
 +        if (instance != null && instance.endsWith("]"))
 +          instance = instance.substring(0, instance.length() - 1);
 +        result.add(new TServerInstance(AddressUtil.parseAddress(hostport, false), instance));
 +      }
 +    }
 +    return result;
 +  }
 +  
 +  private Map<Text,MergeInfo> parseMerges(String merges) {
 +    if (merges == null)
 +      return null;
 +    try {
 +      Map<Text,MergeInfo> result = new HashMap<Text,MergeInfo>();
 +      DataInputBuffer buffer = new DataInputBuffer();
-       byte[] data = Base64.decodeBase64(merges.getBytes());
++      byte[] data = Base64.decodeBase64(merges.getBytes(Constants.UTF8));
 +      buffer.reset(data, data.length);
 +      while (buffer.available() > 0) {
 +        MergeInfo mergeInfo = new MergeInfo();
 +        mergeInfo.readFields(buffer);
 +        result.put(mergeInfo.extent.getTableId(), mergeInfo);
 +      }
 +      return result;
 +    } catch (Exception ex) {
 +      throw new RuntimeException(ex);
 +    }
 +  }
 +  
 +  @Override
 +  protected void consume() throws IOException {
 +    while (getSource().hasTop()) {
 +      Key k = getSource().getTopKey();
 +      Value v = getSource().getTopValue();
 +      
 +      if (onlineTables == null || current == null)
 +        return;
 +      
 +      TabletLocationState tls;
 +      try {
 +        tls = MetaDataTableScanner.createTabletLocationState(k, v);
 +        if (tls == null)
 +          return;
 +      } catch (BadLocationStateException e) {
 +        // maybe the master can do something with a tablet with bad/inconsistent state
 +        return;
 +      }
 +      // we always want data about merges
 +      MergeInfo merge = merges.get(tls.extent.getTableId());
 +      if (merge != null && merge.getExtent() != null && merge.getExtent().overlaps(tls.extent)) {
 +        return;
 +      }
 +      // is the table supposed to be online or offline?
 +      boolean shouldBeOnline = onlineTables.contains(tls.extent.getTableId().toString());
 +      
 +      switch (tls.getState(current)) {
 +        case ASSIGNED:
 +          // we always want data about assigned tablets
 +          return;
 +        case HOSTED:
 +          if (!shouldBeOnline)
 +            return;
 +        case ASSIGNED_TO_DEAD_SERVER:
 +          return;
 +        case UNASSIGNED:
 +          if (shouldBeOnline)
 +            return;
 +      }
 +      // table is in the expected state so don't bother returning any information about it
 +      getSource().next();
 +    }
 +  }
 +  
 +  @Override
 +  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
 +    throw new UnsupportedOperationException();
 +  }
 +  
 +  public static void setCurrentServers(IteratorSetting cfg, Set<TServerInstance> goodServers) {
 +    if (goodServers != null) {
 +      List<String> servers = new ArrayList<String>();
 +      for (TServerInstance server : goodServers)
 +        servers.add(server.toString());
 +      cfg.addOption(SERVERS_OPTION, StringUtil.join(servers, ","));
 +    }
 +  }
 +  
 +  public static void setOnlineTables(IteratorSetting cfg, Set<String> onlineTables) {
 +    if (onlineTables != null)
 +      cfg.addOption(TABLES_OPTION, StringUtil.join(onlineTables, ","));
 +  }
 +  
 +  public static void setMerges(IteratorSetting cfg, Collection<MergeInfo> merges) {
 +    DataOutputBuffer buffer = new DataOutputBuffer();
 +    try {
 +      for (MergeInfo info : merges) {
 +        KeyExtent extent = info.getExtent();
 +        if (extent != null && !info.getState().equals(MergeState.NONE)) {
 +          info.write(buffer);
 +        }
 +      }
 +    } catch (Exception ex) {
 +      throw new RuntimeException(ex);
 +    }
-     String encoded = new String(Base64.encodeBase64(Arrays.copyOf(buffer.getData(), buffer.getLength())));
++    String encoded = new String(Base64.encodeBase64(Arrays.copyOf(buffer.getData(), buffer.getLength())), Constants.UTF8);
 +    cfg.addOption(MERGES_OPTION, encoded);
 +  }
 +  
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooStore.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/master/state/ZooStore.java
index bce6681,0000000..b0ed03f
mode 100644,000000..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooStore.java
@@@ -1,96 -1,0 +1,97 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.master.state;
 +
 +import java.io.IOException;
 +import java.util.List;
 +
++import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +import org.apache.accumulo.server.zookeeper.ZooCache;
 +import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 +import org.apache.log4j.Logger;
 +
 +public class ZooStore implements DistributedStore {
 +  
 +  private static final Logger log = Logger.getLogger(ZooStore.class);
 +  
 +  String basePath;
 +  
 +  ZooCache cache = new ZooCache();
 +  
 +  public ZooStore(String basePath) throws IOException {
 +    if (basePath.endsWith("/"))
 +      basePath = basePath.substring(0, basePath.length() - 1);
 +    this.basePath = basePath;
 +  }
 +  
 +  public ZooStore() throws IOException {
 +    this(ZooUtil.getRoot(HdfsZooInstance.getInstance().getInstanceID()));
 +  }
 +  
 +  @Override
 +  public byte[] get(String path) throws DistributedStoreException {
 +    try {
 +      return cache.get(relative(path));
 +    } catch (Exception ex) {
 +      throw new DistributedStoreException(ex);
 +    }
 +  }
 +  
 +  private String relative(String path) {
 +    return basePath + path;
 +  }
 +  
 +  @Override
 +  public List<String> getChildren(String path) throws DistributedStoreException {
 +    try {
 +      return cache.getChildren(relative(path));
 +    } catch (Exception ex) {
 +      throw new DistributedStoreException(ex);
 +    }
 +  }
 +  
 +  @Override
 +  public void put(String path, byte[] bs) throws DistributedStoreException {
 +    try {
 +      path = relative(path);
 +      ZooReaderWriter.getInstance().putPersistentData(path, bs, NodeExistsPolicy.OVERWRITE);
 +      cache.clear();
-       log.debug("Wrote " + new String(bs) + " to " + path);
++      log.debug("Wrote " + new String(bs, Constants.UTF8) + " to " + path);
 +    } catch (Exception ex) {
 +      throw new DistributedStoreException(ex);
 +    }
 +  }
 +  
 +  @Override
 +  public void remove(String path) throws DistributedStoreException {
 +    try {
 +      log.debug("Removing " + path);
 +      path = relative(path);
 +      IZooReaderWriter zoo = ZooReaderWriter.getInstance();
 +      if (zoo.exists(path))
 +        zoo.recursiveDelete(path, NodeMissingPolicy.SKIP);
 +      cache.clear();
 +    } catch (Exception ex) {
 +      throw new DistributedStoreException(ex);
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
index c792917,0000000..bbd1257
mode 100644,000000..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
@@@ -1,174 -1,0 +1,175 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.master.state;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Iterator;
 +import java.util.List;
 +
++import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.metadata.RootTable;
 +import org.apache.accumulo.core.tabletserver.log.LogEntry;
 +import org.apache.commons.lang.NotImplementedException;
 +import org.apache.log4j.Logger;
 +
 +import com.google.common.net.HostAndPort;
 +
 +public class ZooTabletStateStore extends TabletStateStore {
 +  
 +  private static final Logger log = Logger.getLogger(ZooTabletStateStore.class);
 +  final private DistributedStore store;
 +  
 +  public ZooTabletStateStore(DistributedStore store) {
 +    this.store = store;
 +  }
 +  
 +  public ZooTabletStateStore() throws DistributedStoreException {
 +    try {
 +      store = new ZooStore();
 +    } catch (IOException ex) {
 +      throw new DistributedStoreException(ex);
 +    }
 +  }
 +  
 +  @Override
 +  public Iterator<TabletLocationState> iterator() {
 +    return new Iterator<TabletLocationState>() {
 +      boolean finished = false;
 +      
 +      @Override
 +      public boolean hasNext() {
 +        return !finished;
 +      }
 +      
 +      @Override
 +      public TabletLocationState next() {
 +        finished = true;
 +        try {
 +          byte[] future = store.get(RootTable.ZROOT_TABLET_FUTURE_LOCATION);
 +          byte[] current = store.get(RootTable.ZROOT_TABLET_LOCATION);
 +          byte[] last = store.get(RootTable.ZROOT_TABLET_LAST_LOCATION);
 +          
 +          TServerInstance currentSession = null;
 +          TServerInstance futureSession = null;
 +          TServerInstance lastSession = null;
 +          
 +          if (future != null)
 +            futureSession = parse(future);
 +          
 +          if (last != null)
 +            lastSession = parse(last);
 +          
 +          if (current != null) {
 +            currentSession = parse(current);
 +            futureSession = null;
 +          }
 +          List<Collection<String>> logs = new ArrayList<Collection<String>>();
 +          for (String entry : store.getChildren(RootTable.ZROOT_TABLET_WALOGS)) {
 +            byte[] logInfo = store.get(RootTable.ZROOT_TABLET_WALOGS + "/" + entry);
 +            if (logInfo != null) {
 +              LogEntry logEntry = new LogEntry();
 +              logEntry.fromBytes(logInfo);
 +              logs.add(logEntry.logSet);
 +              log.debug("root tablet logSet " + logEntry.logSet);
 +            }
 +          }
 +          TabletLocationState result = new TabletLocationState(RootTable.EXTENT, futureSession, currentSession, lastSession, logs, false);
 +          log.debug("Returning root tablet state: " + result);
 +          return result;
 +        } catch (Exception ex) {
 +          throw new RuntimeException(ex);
 +        }
 +      }
 +      
 +      @Override
 +      public void remove() {
 +        throw new NotImplementedException();
 +      }
 +    };
 +  }
 +  
 +  protected TServerInstance parse(byte[] current) {
-     String str = new String(current);
++    String str = new String(current, Constants.UTF8);
 +    String[] parts = str.split("[|]", 2);
 +    HostAndPort address = HostAndPort.fromString(parts[0]);
 +    if (parts.length > 1 && parts[1] != null && parts[1].length() > 0) {
 +      return new TServerInstance(address, parts[1]);
 +    } else {
 +      // a 1.2 location specification: DO NOT WANT
 +      return null;
 +    }
 +  }
 +  
 +  @Override
 +  public void setFutureLocations(Collection<Assignment> assignments) throws DistributedStoreException {
 +    if (assignments.size() != 1)
 +      throw new IllegalArgumentException("There is only one root tablet");
 +    Assignment assignment = assignments.iterator().next();
 +    if (assignment.tablet.compareTo(RootTable.EXTENT) != 0)
 +      throw new IllegalArgumentException("You can only store the root tablet location");
 +    String value = assignment.server.getLocation() + "|" + assignment.server.getSession();
 +    Iterator<TabletLocationState> currentIter = iterator();
 +    TabletLocationState current = currentIter.next();
 +    if (current.current != null) {
 +      throw new DistributedStoreException("Trying to set the root tablet location: it is already set to " + current.current);
 +    }
-     store.put(RootTable.ZROOT_TABLET_FUTURE_LOCATION, value.getBytes());
++    store.put(RootTable.ZROOT_TABLET_FUTURE_LOCATION, value.getBytes(Constants.UTF8));
 +  }
 +  
 +  @Override
 +  public void setLocations(Collection<Assignment> assignments) throws DistributedStoreException {
 +    if (assignments.size() != 1)
 +      throw new IllegalArgumentException("There is only one root tablet");
 +    Assignment assignment = assignments.iterator().next();
 +    if (assignment.tablet.compareTo(RootTable.EXTENT) != 0)
 +      throw new IllegalArgumentException("You can only store the root tablet location");
 +    String value = assignment.server.getLocation() + "|" + assignment.server.getSession();
 +    Iterator<TabletLocationState> currentIter = iterator();
 +    TabletLocationState current = currentIter.next();
 +    if (current.current != null) {
 +      throw new DistributedStoreException("Trying to set the root tablet location: it is already set to " + current.current);
 +    }
 +    if (!current.future.equals(assignment.server)) {
 +      throw new DistributedStoreException("Root tablet is already assigned to " + current.future);
 +    }
-     store.put(RootTable.ZROOT_TABLET_LOCATION, value.getBytes());
-     store.put(RootTable.ZROOT_TABLET_LAST_LOCATION, value.getBytes());
++    store.put(RootTable.ZROOT_TABLET_LOCATION, value.getBytes(Constants.UTF8));
++    store.put(RootTable.ZROOT_TABLET_LAST_LOCATION, value.getBytes(Constants.UTF8));
 +    // Make the following unnecessary by making the entire update atomic
 +    store.remove(RootTable.ZROOT_TABLET_FUTURE_LOCATION);
 +    log.debug("Put down root tablet location");
 +  }
 +  
 +  @Override
 +  public void unassign(Collection<TabletLocationState> tablets) throws DistributedStoreException {
 +    if (tablets.size() != 1)
 +      throw new IllegalArgumentException("There is only one root tablet");
 +    TabletLocationState tls = tablets.iterator().next();
 +    if (tls.extent.compareTo(RootTable.EXTENT) != 0)
 +      throw new IllegalArgumentException("You can only store the root tablet location");
 +    store.remove(RootTable.ZROOT_TABLET_LOCATION);
 +    store.remove(RootTable.ZROOT_TABLET_FUTURE_LOCATION);
 +    log.debug("unassign root tablet location");
 +  }
 +  
 +  @Override
 +  public String name() {
 +    return "Root Table";
 +  }
 +  
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/base/src/main/java/org/apache/accumulo/server/metrics/AbstractMetricsImpl.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/metrics/AbstractMetricsImpl.java
index 9735371,0000000..cdcdfba
mode 100644,000000..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/metrics/AbstractMetricsImpl.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/metrics/AbstractMetricsImpl.java
@@@ -1,272 -1,0 +1,276 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.metrics;
 +
 +import java.io.File;
- import java.io.FileWriter;
++import java.io.FileOutputStream;
 +import java.io.IOException;
++import java.io.OutputStreamWriter;
++import java.io.Writer;
 +import java.lang.management.ManagementFactory;
 +import java.text.SimpleDateFormat;
 +import java.util.Date;
 +import java.util.concurrent.ConcurrentHashMap;
 +
 +import javax.management.MBeanServer;
 +import javax.management.ObjectName;
 +import javax.management.StandardMBean;
 +
++import org.apache.accumulo.core.Constants;
 +import org.apache.commons.lang.builder.ToStringBuilder;
 +import org.apache.commons.lang.time.DateUtils;
 +
 +public abstract class AbstractMetricsImpl {
 +  
 +  public class Metric {
 +    
 +    private long count = 0;
 +    private long avg = 0;
 +    private long min = 0;
 +    private long max = 0;
 +    
 +    public long getCount() {
 +      return count;
 +    }
 +    
 +    public long getAvg() {
 +      return avg;
 +    }
 +    
 +    public long getMin() {
 +      return min;
 +    }
 +    
 +    public long getMax() {
 +      return max;
 +    }
 +    
 +    public void incCount() {
 +      count++;
 +    }
 +    
 +    public void addAvg(long a) {
 +      if (a < 0)
 +        return;
 +      avg = (long) ((avg * .8) + (a * .2));
 +    }
 +    
 +    public void addMin(long a) {
 +      if (a < 0)
 +        return;
 +      min = Math.min(min, a);
 +    }
 +    
 +    public void addMax(long a) {
 +      if (a < 0)
 +        return;
 +      max = Math.max(max, a);
 +    }
 +    
 +    @Override
 +    public String toString() {
 +      return new ToStringBuilder(this).append("count", count).append("average", avg).append("minimum", min).append("maximum", max).toString();
 +    }
 +    
 +  }
 +  
 +  static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(AbstractMetricsImpl.class);
 +  
 +  private static ConcurrentHashMap<String,Metric> registry = new ConcurrentHashMap<String,Metric>();
 +  
 +  private boolean currentlyLogging = false;
 +  
 +  private File logDir = null;
 +  
 +  private String metricsPrefix = null;
 +  
 +  private Date today = new Date();
 +  
 +  private File logFile = null;
 +  
-   private FileWriter logWriter = null;
++  private Writer logWriter = null;
 +  
 +  private SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMdd");
 +  
 +  private SimpleDateFormat logFormatter = new SimpleDateFormat("yyyyMMddhhmmssz");
 +  
 +  private MetricsConfiguration config = null;
 +  
 +  public AbstractMetricsImpl() {
 +    this.metricsPrefix = getMetricsPrefix();
 +    config = new MetricsConfiguration(metricsPrefix);
 +  }
 +  
 +  /**
 +   * Registers a StandardMBean with the MBean Server
 +   * 
 +   * @throws Exception
 +   */
 +  public void register(StandardMBean mbean) throws Exception {
 +    // Register this object with the MBeanServer
 +    MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
 +    if (null == getObjectName())
 +      throw new IllegalArgumentException("MBean object name must be set.");
 +    mbs.registerMBean(mbean, getObjectName());
 +    
 +    setupLogging();
 +  }
 +  
 +  /**
 +   * Registers this MBean with the MBean Server
 +   * 
 +   * @throws Exception
 +   */
 +  public void register() throws Exception {
 +    // Register this object with the MBeanServer
 +    MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
 +    if (null == getObjectName())
 +      throw new IllegalArgumentException("MBean object name must be set.");
 +    mbs.registerMBean(this, getObjectName());
 +    setupLogging();
 +  }
 +  
 +  public void createMetric(String name) {
 +    registry.put(name, new Metric());
 +  }
 +  
 +  public Metric getMetric(String name) {
 +    return registry.get(name);
 +  }
 +  
 +  public long getMetricCount(String name) {
 +    return registry.get(name).getCount();
 +  }
 +  
 +  public long getMetricAvg(String name) {
 +    return registry.get(name).getAvg();
 +  }
 +  
 +  public long getMetricMin(String name) {
 +    return registry.get(name).getMin();
 +  }
 +  
 +  public long getMetricMax(String name) {
 +    return registry.get(name).getMax();
 +  }
 +  
 +  private void setupLogging() throws IOException {
 +    if (null == config.getMetricsConfiguration())
 +      return;
 +    // If we are already logging, then return
 +    if (!currentlyLogging && config.getMetricsConfiguration().getBoolean(metricsPrefix + ".logging", false)) {
 +      // Check to see if directory exists, else make it
 +      String mDir = config.getMetricsConfiguration().getString("logging.dir");
 +      if (null != mDir) {
 +        File dir = new File(mDir);
 +        if (!dir.isDirectory())
-           dir.mkdir();
++          if (!dir.mkdir()) 
++            log.warn("Could not create log directory: " + dir);
 +        logDir = dir;
 +        // Create new log file
 +        startNewLog();
 +      }
 +      currentlyLogging = true;
 +    }
 +  }
 +  
 +  private void startNewLog() throws IOException {
 +    if (null != logWriter) {
 +      logWriter.flush();
 +      logWriter.close();
 +    }
 +    logFile = new File(logDir, metricsPrefix + "-" + formatter.format(today) + ".log");
 +    if (!logFile.exists()) {
 +      if (!logFile.createNewFile()) {
 +        log.error("Unable to create new log file");
 +        currentlyLogging = false;
 +        return;
 +      }
 +    }
-     logWriter = new FileWriter(logFile, true);
++    logWriter = new OutputStreamWriter(new FileOutputStream(logFile, true), Constants.UTF8);
 +  }
 +  
 +  private void writeToLog(String name) throws IOException {
 +    if (null == logWriter)
 +      return;
 +    // Increment the date if we have to
 +    Date now = new Date();
 +    if (!DateUtils.isSameDay(today, now)) {
 +      today = now;
 +      startNewLog();
 +    }
 +    logWriter.append(logFormatter.format(now)).append(" Metric: ").append(name).append(": ").append(registry.get(name).toString()).append("\n");
 +  }
 +  
 +  public void add(String name, long time) {
 +    if (isEnabled()) {
 +      registry.get(name).incCount();
 +      registry.get(name).addAvg(time);
 +      registry.get(name).addMin(time);
 +      registry.get(name).addMax(time);
 +      // If we are not currently logging and should be, then initialize
 +      if (!currentlyLogging && config.getMetricsConfiguration().getBoolean(metricsPrefix + ".logging", false)) {
 +        try {
 +          setupLogging();
 +        } catch (IOException ioe) {
 +          log.error("Error setting up log", ioe);
 +        }
 +      } else if (currentlyLogging && !config.getMetricsConfiguration().getBoolean(metricsPrefix + ".logging", false)) {
 +        // if we are currently logging and shouldn't be, then close logs
 +        try {
 +          logWriter.flush();
 +          logWriter.close();
 +          logWriter = null;
 +          logFile = null;
 +        } catch (Exception e) {
 +          log.error("Error stopping metrics logging", e);
 +        }
 +        currentlyLogging = false;
 +      }
 +      if (currentlyLogging) {
 +        try {
 +          writeToLog(name);
 +        } catch (IOException ioe) {
 +          log.error("Error writing to metrics log", ioe);
 +        }
 +      }
 +    }
 +  }
 +  
 +  public boolean isEnabled() {
 +    return config.isEnabled();
 +  }
 +  
 +  protected abstract ObjectName getObjectName();
 +  
 +  protected abstract String getMetricsPrefix();
 +  
 +  @Override
 +  protected void finalize() {
 +    if (null != logWriter) {
 +      try {
 +        logWriter.close();
 +      } catch (Exception e) {
 +        // do nothing
 +      } finally {
 +        logWriter = null;
 +      }
 +    }
 +    logFile = null;
 +  }
 +  
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/base/src/main/java/org/apache/accumulo/server/monitor/LogService.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/monitor/LogService.java
index fd83e97,0000000..4df585d
mode 100644,000000..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/monitor/LogService.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/monitor/LogService.java
@@@ -1,158 -1,0 +1,158 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.monitor;
 +
 +import java.io.IOException;
 +import java.net.ServerSocket;
 +import java.net.Socket;
 +import java.util.ArrayList;
 +import java.util.LinkedHashMap;
 +import java.util.List;
 +import java.util.Map;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.util.Daemon;
 +import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 +import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 +import org.apache.log4j.LogManager;
 +import org.apache.log4j.Logger;
 +import org.apache.log4j.net.SocketNode;
 +import org.apache.log4j.spi.LoggingEvent;
 +
 +/**
 + * Hijack log4j and capture log events for display.
 + * 
 + */
 +public class LogService extends org.apache.log4j.AppenderSkeleton {
 +
 +  private static final Logger log = Logger.getLogger(LogService.class);
 +
 +  /**
 +   * Read logging events forward to us over the net.
 +   * 
 +   */
 +  static class SocketServer implements Runnable {
 +    private ServerSocket server = null;
 +
 +    public SocketServer(int port) {
 +      try {
 +        server = new ServerSocket(port);
 +      } catch (IOException io) {
 +        throw new RuntimeException(io);
 +      }
 +    }
 +
 +    public int getLocalPort() {
 +      return server.getLocalPort();
 +    }
 +
 +    @Override
 +    public void run() {
 +      try {
 +        while (true) {
 +          log.debug("Waiting for log message senders");
 +          Socket socket = server.accept();
 +          log.debug("Got a new connection");
 +          Thread t = new Daemon(new SocketNode(socket, LogManager.getLoggerRepository()));
 +          t.start();
 +        }
 +      } catch (IOException io) {
 +        log.error(io, io);
 +      }
 +    }
 +  }
 +
 +  public static void startLogListener(AccumuloConfiguration conf, String instanceId) {
 +    try {
 +      SocketServer server = new SocketServer(conf.getPort(Property.MONITOR_LOG4J_PORT));
 +      ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instanceId) + Constants.ZMONITOR_LOG4J_PORT,
-           Integer.toString(server.getLocalPort()).getBytes(), NodeExistsPolicy.OVERWRITE);
++          Integer.toString(server.getLocalPort()).getBytes(Constants.UTF8), NodeExistsPolicy.OVERWRITE);
 +      new Daemon(server).start();
 +    } catch (Throwable t) {
 +      log.info("Unable to listen to cluster-wide ports", t);
 +    }
 +  }
 +
 +  static private LogService instance = null;
 +
 +  synchronized public static LogService getInstance() {
 +    if (instance == null)
 +      return new LogService();
 +    return instance;
 +  }
 +
 +  private static final int MAX_LOGS = 50;
 +
 +  private LinkedHashMap<String,DedupedLogEvent> events = new LinkedHashMap<String,DedupedLogEvent>(MAX_LOGS + 1, (float) .75, true) {
 +
 +    private static final long serialVersionUID = 1L;
 +
 +    @Override
 +    @SuppressWarnings("rawtypes")
 +    protected boolean removeEldestEntry(Map.Entry eldest) {
 +      return size() > MAX_LOGS;
 +    }
 +  };
 +
 +  public LogService() {
 +    synchronized (LogService.class) {
 +      instance = this;
 +    }
 +  }
 +
 +  @Override
 +  synchronized protected void append(LoggingEvent ev) {
 +    Object application = ev.getMDC("application");
 +    if (application == null || application.toString().isEmpty())
 +      return;
 +
 +    DedupedLogEvent dev = new DedupedLogEvent(ev);
 +
 +    // if event is present, increase the count
 +    if (events.containsKey(dev.toString())) {
 +      DedupedLogEvent oldDev = events.remove(dev.toString());
 +      dev.setCount(oldDev.getCount() + 1);
 +    }
 +    events.put(dev.toString(), dev);
 +  }
 +
 +  @Override
 +  public void close() {
 +    events = null;
 +  }
 +
 +  @Override
 +  public synchronized void doAppend(LoggingEvent event) {
 +    super.doAppend(event);
 +  }
 +
 +  @Override
 +  public boolean requiresLayout() {
 +    return false;
 +  }
 +
 +  synchronized public List<DedupedLogEvent> getEvents() {
 +    return new ArrayList<DedupedLogEvent>(events.values());
 +  }
 +
 +  synchronized public void clear() {
 +    events.clear();
 +  }
 +}


Mime
View raw message