accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [11/14] accumulo git commit: Merge branch '1.5' into 1.6
Date Tue, 23 Dec 2014 20:04:16 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/2e5064e3/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
----------------------------------------------------------------------
diff --cc server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
index 5385657,0000000..a90cc61
mode 100644,000000..100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
@@@ -1,825 -1,0 +1,825 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.monitor;
 +
 +import static com.google.common.base.Charsets.UTF_8;
 +
 +import java.net.InetAddress;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.LinkedList;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +import java.util.TreeMap;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.impl.MasterClient;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.gc.thrift.GCMonitorService;
 +import org.apache.accumulo.core.gc.thrift.GCStatus;
 +import org.apache.accumulo.core.master.thrift.MasterClientService;
 +import org.apache.accumulo.core.master.thrift.MasterMonitorInfo;
 +import org.apache.accumulo.core.master.thrift.TableInfo;
 +import org.apache.accumulo.core.master.thrift.TabletServerStatus;
- import org.apache.accumulo.core.security.SecurityUtil;
 +import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
 +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
 +import org.apache.accumulo.core.util.Daemon;
 +import org.apache.accumulo.core.util.LoggingRunnable;
 +import org.apache.accumulo.core.util.Pair;
 +import org.apache.accumulo.core.util.ServerServices;
 +import org.apache.accumulo.core.util.ServerServices.Service;
 +import org.apache.accumulo.core.util.ThriftUtil;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
 +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
 +import org.apache.accumulo.monitor.servlets.DefaultServlet;
 +import org.apache.accumulo.monitor.servlets.GcStatusServlet;
 +import org.apache.accumulo.monitor.servlets.JSONServlet;
 +import org.apache.accumulo.monitor.servlets.LogServlet;
 +import org.apache.accumulo.monitor.servlets.MasterServlet;
 +import org.apache.accumulo.monitor.servlets.OperationServlet;
 +import org.apache.accumulo.monitor.servlets.ProblemServlet;
 +import org.apache.accumulo.monitor.servlets.ScanServlet;
 +import org.apache.accumulo.monitor.servlets.ShellServlet;
 +import org.apache.accumulo.monitor.servlets.TServersServlet;
 +import org.apache.accumulo.monitor.servlets.TablesServlet;
 +import org.apache.accumulo.monitor.servlets.VisServlet;
 +import org.apache.accumulo.monitor.servlets.XMLServlet;
 +import org.apache.accumulo.monitor.servlets.trace.ListType;
 +import org.apache.accumulo.monitor.servlets.trace.ShowTrace;
 +import org.apache.accumulo.monitor.servlets.trace.Summary;
 +import org.apache.accumulo.server.Accumulo;
 +import org.apache.accumulo.server.ServerOpts;
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +import org.apache.accumulo.server.conf.ServerConfiguration;
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.accumulo.server.fs.VolumeManagerImpl;
 +import org.apache.accumulo.server.monitor.LogService;
 +import org.apache.accumulo.server.problems.ProblemReports;
 +import org.apache.accumulo.server.problems.ProblemType;
 +import org.apache.accumulo.server.security.SystemCredentials;
++import org.apache.accumulo.server.security.SecurityUtil;
 +import org.apache.accumulo.server.util.Halt;
 +import org.apache.accumulo.server.util.TableInfoUtil;
 +import org.apache.accumulo.server.zookeeper.ZooLock;
 +import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 +import org.apache.accumulo.trace.instrument.Tracer;
 +import org.apache.log4j.Logger;
 +import org.apache.zookeeper.KeeperException;
 +
 +import com.google.common.net.HostAndPort;
 +
 +/**
 + * Serve master statistics with an embedded web server.
 + */
 +public class Monitor {
 +  private static final Logger log = Logger.getLogger(Monitor.class);
 +
 +  private static final int REFRESH_TIME = 5;
 +  private static long lastRecalc = 0L;
 +  private static double totalIngestRate = 0.0;
 +  private static double totalIngestByteRate = 0.0;
 +  private static double totalQueryRate = 0.0;
 +  private static double totalScanRate = 0.0;
 +  private static double totalQueryByteRate = 0.0;
 +  private static long totalEntries = 0L;
 +  private static int totalTabletCount = 0;
 +  private static int onlineTabletCount = 0;
 +  private static long totalHoldTime = 0;
 +  private static long totalLookups = 0;
 +  private static int totalTables = 0;
 +
 +  private static class MaxList<T> extends LinkedList<Pair<Long,T>> {
 +    private static final long serialVersionUID = 1L;
 +
 +    private long maxDelta;
 +
 +    public MaxList(long maxDelta) {
 +      this.maxDelta = maxDelta;
 +    }
 +
 +    @Override
 +    public boolean add(Pair<Long,T> obj) {
 +      boolean result = super.add(obj);
 +
 +      if (obj.getFirst() - get(0).getFirst() > maxDelta)
 +        remove(0);
 +
 +      return result;
 +    }
 +
 +  }
 +
 +  private static final int MAX_TIME_PERIOD = 60 * 60 * 1000;
 +  private static final List<Pair<Long,Double>> loadOverTime = Collections.synchronizedList(new
MaxList<Double>(MAX_TIME_PERIOD));
 +  private static final List<Pair<Long,Double>> ingestRateOverTime = Collections.synchronizedList(new
MaxList<Double>(MAX_TIME_PERIOD));
 +  private static final List<Pair<Long,Double>> ingestByteRateOverTime = Collections.synchronizedList(new
MaxList<Double>(MAX_TIME_PERIOD));
 +  private static final List<Pair<Long,Integer>> recoveriesOverTime = Collections.synchronizedList(new
MaxList<Integer>(MAX_TIME_PERIOD));
 +  private static final List<Pair<Long,Integer>> minorCompactionsOverTime = Collections.synchronizedList(new
MaxList<Integer>(MAX_TIME_PERIOD));
 +  private static final List<Pair<Long,Integer>> majorCompactionsOverTime = Collections.synchronizedList(new
MaxList<Integer>(MAX_TIME_PERIOD));
 +  private static final List<Pair<Long,Double>> lookupsOverTime = Collections.synchronizedList(new
MaxList<Double>(MAX_TIME_PERIOD));
 +  private static final List<Pair<Long,Integer>> queryRateOverTime = Collections.synchronizedList(new
MaxList<Integer>(MAX_TIME_PERIOD));
 +  private static final List<Pair<Long,Integer>> scanRateOverTime = Collections.synchronizedList(new
MaxList<Integer>(MAX_TIME_PERIOD));
 +  private static final List<Pair<Long,Double>> queryByteRateOverTime = Collections.synchronizedList(new
MaxList<Double>(MAX_TIME_PERIOD));
 +  private static final List<Pair<Long,Double>> indexCacheHitRateOverTime = Collections.synchronizedList(new
MaxList<Double>(MAX_TIME_PERIOD));
 +  private static final List<Pair<Long,Double>> dataCacheHitRateOverTime = Collections.synchronizedList(new
MaxList<Double>(MAX_TIME_PERIOD));
 +  private static EventCounter lookupRateTracker = new EventCounter();
 +  private static EventCounter indexCacheHitTracker = new EventCounter();
 +  private static EventCounter indexCacheRequestTracker = new EventCounter();
 +  private static EventCounter dataCacheHitTracker = new EventCounter();
 +  private static EventCounter dataCacheRequestTracker = new EventCounter();
 +
 +  private static volatile boolean fetching = false;
 +  private static MasterMonitorInfo mmi;
 +  private static Map<String,Map<ProblemType,Integer>> problemSummary = Collections.emptyMap();
 +  private static Exception problemException;
 +  private static GCStatus gcStatus;
 +
 +  private static Instance instance;
 +
 +  private static ServerConfiguration config;
 +
 +  private static EmbeddedWebServer server;
 +
 +  private ZooLock monitorLock;
 +
 +  private static class EventCounter {
 +
 +    Map<String,Pair<Long,Long>> prevSamples = new HashMap<String,Pair<Long,Long>>();
 +    Map<String,Pair<Long,Long>> samples = new HashMap<String,Pair<Long,Long>>();
 +    Set<String> serversUpdated = new HashSet<String>();
 +
 +    void startingUpdates() {
 +      serversUpdated.clear();
 +    }
 +
 +    void updateTabletServer(String name, long sampleTime, long numEvents) {
 +      Pair<Long,Long> newSample = new Pair<Long,Long>(sampleTime, numEvents);
 +      Pair<Long,Long> lastSample = samples.get(name);
 +
 +      if (lastSample == null || !lastSample.equals(newSample)) {
 +        samples.put(name, newSample);
 +        if (lastSample != null) {
 +          prevSamples.put(name, lastSample);
 +        }
 +      }
 +      serversUpdated.add(name);
 +    }
 +
 +    void finishedUpdating() {
 +      // remove any tablet servers not updated
 +      samples.keySet().retainAll(serversUpdated);
 +      prevSamples.keySet().retainAll(serversUpdated);
 +    }
 +
 +    double calculateRate() {
 +      double totalRate = 0;
 +
 +      for (Entry<String,Pair<Long,Long>> entry : prevSamples.entrySet()) {
 +        Pair<Long,Long> prevSample = entry.getValue();
 +        Pair<Long,Long> sample = samples.get(entry.getKey());
 +
 +        totalRate += (sample.getSecond() - prevSample.getSecond()) / ((sample.getFirst()
- prevSample.getFirst()) / (double) 1000);
 +      }
 +
 +      return totalRate;
 +    }
 +
 +    long calculateCount() {
 +      long count = 0;
 +
 +      for (Entry<String,Pair<Long,Long>> entry : prevSamples.entrySet()) {
 +        Pair<Long,Long> prevSample = entry.getValue();
 +        Pair<Long,Long> sample = samples.get(entry.getKey());
 +
 +        count += sample.getSecond() - prevSample.getSecond();
 +      }
 +
 +      return count;
 +    }
 +  }
 +
 +  public static void fetchData() {
 +    double totalIngestRate = 0.;
 +    double totalIngestByteRate = 0.;
 +    double totalQueryRate = 0.;
 +    double totalQueryByteRate = 0.;
 +    double totalScanRate = 0.;
 +    long totalEntries = 0;
 +    int totalTabletCount = 0;
 +    int onlineTabletCount = 0;
 +    long totalHoldTime = 0;
 +    long totalLookups = 0;
 +    boolean retry = true;
 +
 +    // only recalc every so often
 +    long currentTime = System.currentTimeMillis();
 +    if (currentTime - lastRecalc < REFRESH_TIME * 1000)
 +      return;
 +
 +    synchronized (Monitor.class) {
 +      if (fetching)
 +        return;
 +      fetching = true;
 +    }
 +
 +    try {
 +      while (retry) {
 +        MasterClientService.Iface client = null;
 +        try {
 +          client = MasterClient.getConnection(HdfsZooInstance.getInstance());
 +          if (client != null) {
 +            mmi = client.getMasterStats(Tracer.traceInfo(), SystemCredentials.get().toThrift(HdfsZooInstance.getInstance()));
 +            retry = false;
 +          } else {
 +            mmi = null;
 +          }
 +          Monitor.gcStatus = fetchGcStatus();
 +        } catch (Exception e) {
 +          mmi = null;
 +          log.info("Error fetching stats: " + e);
 +        } finally {
 +          if (client != null) {
 +            MasterClient.close(client);
 +          }
 +        }
 +        if (mmi == null)
 +          UtilWaitThread.sleep(1000);
 +      }
 +      if (mmi != null) {
 +        int majorCompactions = 0;
 +        int minorCompactions = 0;
 +
 +        lookupRateTracker.startingUpdates();
 +        indexCacheHitTracker.startingUpdates();
 +        indexCacheRequestTracker.startingUpdates();
 +        dataCacheHitTracker.startingUpdates();
 +        dataCacheRequestTracker.startingUpdates();
 +
 +        for (TabletServerStatus server : mmi.tServerInfo) {
 +          TableInfo summary = TableInfoUtil.summarizeTableStats(server);
 +          totalIngestRate += summary.ingestRate;
 +          totalIngestByteRate += summary.ingestByteRate;
 +          totalQueryRate += summary.queryRate;
 +          totalScanRate += summary.scanRate;
 +          totalQueryByteRate += summary.queryByteRate;
 +          totalEntries += summary.recs;
 +          totalHoldTime += server.holdTime;
 +          totalLookups += server.lookups;
 +          majorCompactions += summary.majors.running;
 +          minorCompactions += summary.minors.running;
 +          lookupRateTracker.updateTabletServer(server.name, server.lastContact, server.lookups);
 +          indexCacheHitTracker.updateTabletServer(server.name, server.lastContact, server.indexCacheHits);
 +          indexCacheRequestTracker.updateTabletServer(server.name, server.lastContact, server.indexCacheRequest);
 +          dataCacheHitTracker.updateTabletServer(server.name, server.lastContact, server.dataCacheHits);
 +          dataCacheRequestTracker.updateTabletServer(server.name, server.lastContact, server.dataCacheRequest);
 +        }
 +
 +        lookupRateTracker.finishedUpdating();
 +        indexCacheHitTracker.finishedUpdating();
 +        indexCacheRequestTracker.finishedUpdating();
 +        dataCacheHitTracker.finishedUpdating();
 +        dataCacheRequestTracker.finishedUpdating();
 +
 +        int totalTables = 0;
 +        for (TableInfo tInfo : mmi.tableMap.values()) {
 +          totalTabletCount += tInfo.tablets;
 +          onlineTabletCount += tInfo.onlineTablets;
 +          totalTables++;
 +        }
 +        Monitor.totalIngestRate = totalIngestRate;
 +        Monitor.totalTables = totalTables;
 +        totalIngestByteRate = totalIngestByteRate / 1000000.0;
 +        Monitor.totalIngestByteRate = totalIngestByteRate;
 +        Monitor.totalQueryRate = totalQueryRate;
 +        Monitor.totalScanRate = totalScanRate;
 +        totalQueryByteRate = totalQueryByteRate / 1000000.0;
 +        Monitor.totalQueryByteRate = totalQueryByteRate;
 +        Monitor.totalEntries = totalEntries;
 +        Monitor.totalTabletCount = totalTabletCount;
 +        Monitor.onlineTabletCount = onlineTabletCount;
 +        Monitor.totalHoldTime = totalHoldTime;
 +        Monitor.totalLookups = totalLookups;
 +
 +        ingestRateOverTime.add(new Pair<Long,Double>(currentTime, totalIngestRate));
 +        ingestByteRateOverTime.add(new Pair<Long,Double>(currentTime, totalIngestByteRate));
 +
 +        double totalLoad = 0.;
 +        for (TabletServerStatus status : mmi.tServerInfo) {
 +          if (status != null)
 +            totalLoad += status.osLoad;
 +        }
 +        loadOverTime.add(new Pair<Long,Double>(currentTime, totalLoad));
 +
 +        minorCompactionsOverTime.add(new Pair<Long,Integer>(currentTime, minorCompactions));
 +        majorCompactionsOverTime.add(new Pair<Long,Integer>(currentTime, majorCompactions));
 +
 +        lookupsOverTime.add(new Pair<Long,Double>(currentTime, lookupRateTracker.calculateRate()));
 +
 +        queryRateOverTime.add(new Pair<Long,Integer>(currentTime, (int) totalQueryRate));
 +        queryByteRateOverTime.add(new Pair<Long,Double>(currentTime, totalQueryByteRate));
 +
 +        scanRateOverTime.add(new Pair<Long,Integer>(currentTime, (int) totalScanRate));
 +
 +        calcCacheHitRate(indexCacheHitRateOverTime, currentTime, indexCacheHitTracker, indexCacheRequestTracker);
 +        calcCacheHitRate(dataCacheHitRateOverTime, currentTime, dataCacheHitTracker, dataCacheRequestTracker);
 +      }
 +      try {
 +        Monitor.problemSummary = ProblemReports.getInstance().summarize();
 +        Monitor.problemException = null;
 +      } catch (Exception e) {
 +        log.info("Failed to obtain problem reports ", e);
 +        Monitor.problemSummary = Collections.emptyMap();
 +        Monitor.problemException = e;
 +      }
 +
 +    } finally {
 +      synchronized (Monitor.class) {
 +        fetching = false;
 +        lastRecalc = currentTime;
 +      }
 +    }
 +  }
 +
 +  private static void calcCacheHitRate(List<Pair<Long,Double>> hitRate, long
currentTime, EventCounter cacheHits, EventCounter cacheReq) {
 +    long req = cacheReq.calculateCount();
 +    if (req > 0)
 +      hitRate.add(new Pair<Long,Double>(currentTime, cacheHits.calculateCount() /
(double) cacheReq.calculateCount()));
 +    else
 +      hitRate.add(new Pair<Long,Double>(currentTime, null));
 +  }
 +
 +  private static GCStatus fetchGcStatus() {
 +    GCStatus result = null;
 +    HostAndPort address = null;
 +    try {
 +      // Read the gc location from its lock
 +      ZooReaderWriter zk = ZooReaderWriter.getInstance();
 +      String path = ZooUtil.getRoot(instance) + Constants.ZGC_LOCK;
 +      List<String> locks = zk.getChildren(path, null);
 +      if (locks != null && locks.size() > 0) {
 +        Collections.sort(locks);
 +        address = new ServerServices(new String(zk.getData(path + "/" + locks.get(0), null),
UTF_8)).getAddress(Service.GC_CLIENT);
 +        GCMonitorService.Client client = ThriftUtil.getClient(new GCMonitorService.Client.Factory(),
address, config.getConfiguration());
 +        try {
 +          result = client.getStatus(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance));
 +        } finally {
 +          ThriftUtil.returnClient(client);
 +        }
 +      }
 +    } catch (Exception ex) {
 +      log.warn("Unable to contact the garbage collector at " + address, ex);
 +    }
 +    return result;
 +  }
 +
 +  public static void main(String[] args) throws Exception {
 +    SecurityUtil.serverLogin(ServerConfiguration.getSiteConfiguration());
 +
 +    ServerOpts opts = new ServerOpts();
 +    final String app = "monitor";
 +    opts.parseArgs(app, args);
 +    String hostname = opts.getAddress();
 +
 +    Accumulo.setupLogging(app);
 +    VolumeManager fs = VolumeManagerImpl.get();
 +    instance = HdfsZooInstance.getInstance();
 +    config = new ServerConfiguration(instance);
 +    Accumulo.init(fs, config, app);
 +    Monitor monitor = new Monitor();
 +    Accumulo.enableTracing(hostname, app);
 +    monitor.run(hostname);
 +  }
 +
 +  private static long START_TIME;
 +
 +  public void run(String hostname) {
 +    try {
 +      getMonitorLock();
 +    } catch (Exception e) {
 +      log.error("Failed to get Monitor ZooKeeper lock");
 +      throw new RuntimeException(e);
 +    }
 +
 +    Monitor.START_TIME = System.currentTimeMillis();
 +    int port = config.getConfiguration().getPort(Property.MONITOR_PORT);
 +    try {
 +      log.debug("Creating monitor on port " + port);
 +      server = new EmbeddedWebServer(hostname, port);
 +    } catch (Throwable ex) {
 +      log.error("Unable to start embedded web server", ex);
 +      throw new RuntimeException(ex);
 +    }
 +
 +    server.addServlet(DefaultServlet.class, "/");
 +    server.addServlet(OperationServlet.class, "/op");
 +    server.addServlet(MasterServlet.class, "/master");
 +    server.addServlet(TablesServlet.class, "/tables");
 +    server.addServlet(TServersServlet.class, "/tservers");
 +    server.addServlet(ProblemServlet.class, "/problems");
 +    server.addServlet(GcStatusServlet.class, "/gc");
 +    server.addServlet(LogServlet.class, "/log");
 +    server.addServlet(XMLServlet.class, "/xml");
 +    server.addServlet(JSONServlet.class, "/json");
 +    server.addServlet(VisServlet.class, "/vis");
 +    server.addServlet(ScanServlet.class, "/scans");
 +    server.addServlet(Summary.class, "/trace/summary");
 +    server.addServlet(ListType.class, "/trace/listType");
 +    server.addServlet(ShowTrace.class, "/trace/show");
 +    if (server.isUsingSsl())
 +      server.addServlet(ShellServlet.class, "/shell");
 +    server.start();
 +
 +    try {
 +      hostname = InetAddress.getLocalHost().getHostName();
 +
 +      log.debug("Using " + hostname + " to advertise monitor location in ZooKeeper");
 +
 +      String monitorAddress = HostAndPort.fromParts(hostname, server.getPort()).toString();
 +
 +      ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instance) + Constants.ZMONITOR_HTTP_ADDR,
monitorAddress.getBytes(UTF_8),
 +          NodeExistsPolicy.OVERWRITE);
 +      log.info("Set monitor address in zookeeper to " + monitorAddress);
 +    } catch (Exception ex) {
 +      log.error("Unable to set monitor HTTP address in zookeeper", ex);
 +    }
 +
 +    if (null != hostname) {
 +      LogService.startLogListener(Monitor.getSystemConfiguration(), instance.getInstanceID(),
hostname);
 +    } else {
 +      log.warn("Not starting log4j listener as we could not determine address to use");
 +    }
 +
 +    new Daemon(new LoggingRunnable(log, new ZooKeeperStatus()), "ZooKeeperStatus").start();
 +
 +    // need to regularly fetch data so plot data is updated
 +    new Daemon(new LoggingRunnable(log, new Runnable() {
 +
 +      @Override
 +      public void run() {
 +        while (true) {
 +          try {
 +            Monitor.fetchData();
 +          } catch (Exception e) {
 +            log.warn(e.getMessage(), e);
 +          }
 +
 +          UtilWaitThread.sleep(333);
 +        }
 +
 +      }
 +    }), "Data fetcher").start();
 +
 +    new Daemon(new LoggingRunnable(log, new Runnable() {
 +      @Override
 +      public void run() {
 +        while (true) {
 +          try {
 +            Monitor.fetchScans();
 +          } catch (Exception e) {
 +            log.warn(e.getMessage(), e);
 +          }
 +          UtilWaitThread.sleep(5000);
 +        }
 +      }
 +    }), "Scan scanner").start();
 +  }
 +
 +  public static class ScanStats {
 +    public final long scanCount;
 +    public final Long oldestScan;
 +    public final long fetched;
 +    ScanStats(List<ActiveScan> active) {
 +      this.scanCount = active.size();
 +      long oldest = -1;
 +      for (ActiveScan scan : active) {
 +        oldest = Math.max(oldest, scan.age);
 +      }
 +      this.oldestScan = oldest < 0 ? null : oldest;
 +      this.fetched = System.currentTimeMillis();
 +    }
 +  }
 +  static final Map<String, ScanStats> allScans = new HashMap<String, ScanStats>();
 +  public static Map<String, ScanStats> getScans() {
 +    synchronized (allScans) {
 +      return new TreeMap<String, ScanStats>(allScans);
 +    }
 +  }
 +
 +  protected static void fetchScans() throws Exception {
 +    if (instance == null)
 +      return;
 +    Connector c = instance.getConnector(SystemCredentials.get().getPrincipal(), SystemCredentials.get().getToken());
 +    for (String server : c.instanceOperations().getTabletServers()) {
 +      Client tserver = ThriftUtil.getTServerClient(server, Monitor.getSystemConfiguration());
 +      try {
 +        List<ActiveScan> scans = tserver.getActiveScans(null, SystemCredentials.get().toThrift(instance));
 +        synchronized (allScans) {
 +          allScans.put(server, new ScanStats(scans));
 +        }
 +      } catch (Exception ex) {
 +        log.debug(ex, ex);
 +      } finally {
 +        ThriftUtil.returnClient(tserver);
 +      }
 +    }
 +    // Age off old scan information
 +    Iterator<Entry<String,ScanStats>> entryIter = allScans.entrySet().iterator();
 +    long now = System.currentTimeMillis();
 +    while (entryIter.hasNext()) {
 +      Entry<String,ScanStats> entry = entryIter.next();
 +      if (now - entry.getValue().fetched > 5 * 60 * 1000) {
 +        entryIter.remove();
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Get the monitor lock in ZooKeeper
 +   */
 +  private void getMonitorLock() throws KeeperException, InterruptedException {
 +    final String zRoot = ZooUtil.getRoot(instance);
 +    final String monitorPath = zRoot + Constants.ZMONITOR;
 +    final String monitorLockPath = zRoot + Constants.ZMONITOR_LOCK;
 +
 +    // Ensure that everything is kosher with ZK as this has changed.
 +    ZooReaderWriter zoo = ZooReaderWriter.getInstance();
 +    if (zoo.exists(monitorPath)) {
 +      byte[] data = zoo.getData(monitorPath, null);
 +      // If the node isn't empty, it's from a previous install (has hostname:port for HTTP
server)
 +      if (0 != data.length) {
 +        // Recursively delete from that parent node
 +        zoo.recursiveDelete(monitorPath, NodeMissingPolicy.SKIP);
 +
 +        // And then make the nodes that we expect for the incoming ephemeral nodes
 +        zoo.putPersistentData(monitorPath, new byte[0], NodeExistsPolicy.FAIL);
 +        zoo.putPersistentData(monitorLockPath, new byte[0], NodeExistsPolicy.FAIL);
 +      } else if (!zoo.exists(monitorLockPath)) {
 +        // monitor node in ZK exists and is empty as we expect
 +        // but the monitor/lock node does not
 +        zoo.putPersistentData(monitorLockPath, new byte[0], NodeExistsPolicy.FAIL);
 +      }
 +    } else {
 +      // 1.5.0 and earlier
 +      zoo.putPersistentData(zRoot + Constants.ZMONITOR, new byte[0], NodeExistsPolicy.FAIL);
 +      if (!zoo.exists(monitorLockPath)) {
 +        // Somehow the monitor node exists but not monitor/lock
 +        zoo.putPersistentData(monitorLockPath, new byte[0], NodeExistsPolicy.FAIL);
 +      }
 +    }
 +
 +    // Get a ZooLock for the monitor
 +    while (true) {
 +      MoniterLockWatcher monitorLockWatcher = new MoniterLockWatcher();
 +      monitorLock = new ZooLock(monitorLockPath);
 +      monitorLock.lockAsync(monitorLockWatcher, new byte[0]);
 +
 +      monitorLockWatcher.waitForChange();
 +
 +      if (monitorLockWatcher.acquiredLock) {
 +        break;
 +      }
 +
 +      if (!monitorLockWatcher.failedToAcquireLock) {
 +        throw new IllegalStateException("monitor lock in unknown state");
 +      }
 +
 +      monitorLock.tryToCancelAsyncLockOrUnlock();
 +
 +      UtilWaitThread.sleep(getSystemConfiguration().getTimeInMillis(Property.MONITOR_LOCK_CHECK_INTERVAL));
 +    }
 +
 +    log.info("Got Monitor lock.");
 +  }
 +
 +  /**
 +   * Async Watcher for monitor lock
 +   */
 +  private static class MoniterLockWatcher implements ZooLock.AsyncLockWatcher {
 +
 +    boolean acquiredLock = false;
 +    boolean failedToAcquireLock = false;
 +
 +    @Override
 +    public void lostLock(LockLossReason reason) {
 +      Halt.halt("Monitor lock in zookeeper lost (reason = " + reason + "), exiting!", -1);
 +    }
 +
 +    @Override
 +    public void unableToMonitorLockNode(final Throwable e) {
 +      Halt.halt(-1, new Runnable() {
 +        @Override
 +        public void run() {
 +          log.fatal("No longer able to monitor Monitor lock node", e);
 +        }
 +      });
 +
 +    }
 +
 +    @Override
 +    public synchronized void acquiredLock() {
 +      if (acquiredLock || failedToAcquireLock) {
 +        Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + failedToAcquireLock,
-1);
 +      }
 +
 +      acquiredLock = true;
 +      notifyAll();
 +    }
 +
 +    @Override
 +    public synchronized void failedToAcquireLock(Exception e) {
 +      log.warn("Failed to get monitor lock " + e);
 +
 +      if (acquiredLock) {
 +        Halt.halt("Zoolock in unexpected state FAL " + acquiredLock + " " + failedToAcquireLock,
-1);
 +      }
 +
 +      failedToAcquireLock = true;
 +      notifyAll();
 +    }
 +
 +    public synchronized void waitForChange() {
 +      while (!acquiredLock && !failedToAcquireLock) {
 +        try {
 +          wait();
 +        } catch (InterruptedException e) {}
 +      }
 +    }
 +  }
 +
 +  public static MasterMonitorInfo getMmi() {
 +    return mmi;
 +  }
 +
 +  public static int getTotalTables() {
 +    return totalTables;
 +  }
 +
 +  public static int getTotalTabletCount() {
 +    return totalTabletCount;
 +  }
 +
 +  public static int getOnlineTabletCount() {
 +    return onlineTabletCount;
 +  }
 +
 +  public static long getTotalEntries() {
 +    return totalEntries;
 +  }
 +
 +  public static double getTotalIngestRate() {
 +    return totalIngestRate;
 +  }
 +
 +  public static double getTotalIngestByteRate() {
 +    return totalIngestByteRate;
 +  }
 +
 +  public static double getTotalQueryRate() {
 +    return totalQueryRate;
 +  }
 +
 +  public static double getTotalScanRate() {
 +    return totalScanRate;
 +  }
 +
 +  public static double getTotalQueryByteRate() {
 +    return totalQueryByteRate;
 +  }
 +
 +  public static long getTotalHoldTime() {
 +    return totalHoldTime;
 +  }
 +
 +  public static Exception getProblemException() {
 +    return problemException;
 +  }
 +
 +  public static Map<String,Map<ProblemType,Integer>> getProblemSummary() {
 +    return problemSummary;
 +  }
 +
 +  public static GCStatus getGcStatus() {
 +    return gcStatus;
 +  }
 +
 +  public static long getTotalLookups() {
 +    return totalLookups;
 +  }
 +
 +  public static long getStartTime() {
 +    return START_TIME;
 +  }
 +
 +  public static List<Pair<Long,Double>> getLoadOverTime() {
 +    synchronized (loadOverTime) {
 +      return new ArrayList<Pair<Long,Double>>(loadOverTime);
 +    }
 +  }
 +
 +  public static List<Pair<Long,Double>> getIngestRateOverTime() {
 +    synchronized (ingestRateOverTime) {
 +      return new ArrayList<Pair<Long,Double>>(ingestRateOverTime);
 +    }
 +  }
 +
 +  public static List<Pair<Long,Double>> getIngestByteRateOverTime() {
 +    synchronized (ingestByteRateOverTime) {
 +      return new ArrayList<Pair<Long,Double>>(ingestByteRateOverTime);
 +    }
 +  }
 +
 +  public static List<Pair<Long,Integer>> getRecoveriesOverTime() {
 +    synchronized (recoveriesOverTime) {
 +      return new ArrayList<Pair<Long,Integer>>(recoveriesOverTime);
 +    }
 +  }
 +
 +  public static List<Pair<Long,Integer>> getMinorCompactionsOverTime() {
 +    synchronized (minorCompactionsOverTime) {
 +      return new ArrayList<Pair<Long,Integer>>(minorCompactionsOverTime);
 +    }
 +  }
 +
 +  public static List<Pair<Long,Integer>> getMajorCompactionsOverTime() {
 +    synchronized (majorCompactionsOverTime) {
 +      return new ArrayList<Pair<Long,Integer>>(majorCompactionsOverTime);
 +    }
 +  }
 +
 +  public static List<Pair<Long,Double>> getLookupsOverTime() {
 +    synchronized (lookupsOverTime) {
 +      return new ArrayList<Pair<Long,Double>>(lookupsOverTime);
 +    }
 +  }
 +
 +  public static double getLookupRate() {
 +    return lookupRateTracker.calculateRate();
 +  }
 +
 +  public static List<Pair<Long,Integer>> getQueryRateOverTime() {
 +    synchronized (queryRateOverTime) {
 +      return new ArrayList<Pair<Long,Integer>>(queryRateOverTime);
 +    }
 +  }
 +
 +  public static List<Pair<Long,Integer>> getScanRateOverTime() {
 +    synchronized (scanRateOverTime) {
 +      return new ArrayList<Pair<Long,Integer>>(scanRateOverTime);
 +    }
 +  }
 +
 +  public static List<Pair<Long,Double>> getQueryByteRateOverTime() {
 +    synchronized (queryByteRateOverTime) {
 +      return new ArrayList<Pair<Long,Double>>(queryByteRateOverTime);
 +    }
 +  }
 +
 +  public static List<Pair<Long,Double>> getIndexCacheHitRateOverTime() {
 +    synchronized (indexCacheHitRateOverTime) {
 +      return new ArrayList<Pair<Long,Double>>(indexCacheHitRateOverTime);
 +    }
 +  }
 +
 +  public static List<Pair<Long,Double>> getDataCacheHitRateOverTime() {
 +    synchronized (dataCacheHitRateOverTime) {
 +      return new ArrayList<Pair<Long,Double>>(dataCacheHitRateOverTime);
 +    }
 +  }
 +
 +  public static AccumuloConfiguration getSystemConfiguration() {
 +    return config.getConfiguration();
 +  }
 +
 +  public static Instance getInstance() {
 +    return instance;
 +  }
 +
 +  public static boolean isUsingSsl() {
 +    return server.isUsingSsl();
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2e5064e3/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
----------------------------------------------------------------------
diff --cc server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
index 70e8d7d,0000000..412f551
mode 100644,000000..100644
--- a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
+++ b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
@@@ -1,331 -1,0 +1,331 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tracer;
 +
 +import static com.google.common.base.Charsets.UTF_8;
 +
 +import java.net.InetSocketAddress;
 +import java.net.ServerSocket;
 +import java.nio.channels.ServerSocketChannel;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicReference;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.MutationsRejectedException;
 +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.Properties;
 +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.iterators.user.AgeOffFilter;
- import org.apache.accumulo.core.security.SecurityUtil;
 +import org.apache.accumulo.core.trace.TraceFormatter;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 +import org.apache.accumulo.server.Accumulo;
 +import org.apache.accumulo.server.ServerOpts;
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +import org.apache.accumulo.server.conf.ServerConfiguration;
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.accumulo.server.fs.VolumeManagerImpl;
++import org.apache.accumulo.server.security.SecurityUtil;
 +import org.apache.accumulo.server.util.time.SimpleTimer;
 +import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 +import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
 +import org.apache.accumulo.trace.instrument.Span;
 +import org.apache.accumulo.trace.thrift.RemoteSpan;
 +import org.apache.accumulo.trace.thrift.SpanReceiver.Iface;
 +import org.apache.accumulo.trace.thrift.SpanReceiver.Processor;
 +import org.apache.hadoop.io.Text;
 +import org.apache.log4j.Logger;
 +import org.apache.thrift.TByteArrayOutputStream;
 +import org.apache.thrift.TException;
 +import org.apache.thrift.protocol.TCompactProtocol;
 +import org.apache.thrift.server.TServer;
 +import org.apache.thrift.server.TThreadPoolServer;
 +import org.apache.thrift.transport.TServerSocket;
 +import org.apache.thrift.transport.TServerTransport;
 +import org.apache.thrift.transport.TTransport;
 +import org.apache.thrift.transport.TTransportException;
 +import org.apache.zookeeper.WatchedEvent;
 +import org.apache.zookeeper.Watcher;
 +import org.apache.zookeeper.Watcher.Event.EventType;
 +import org.apache.zookeeper.Watcher.Event.KeeperState;
 +
 +public class TraceServer implements Watcher {
 +
 +  final private static Logger log = Logger.getLogger(TraceServer.class);
 +  final private ServerConfiguration serverConfiguration;
 +  final private TServer server;
 +  final private AtomicReference<BatchWriter> writer;
 +  final private Connector connector;
 +  final String table;
 +
 +  private static void put(Mutation m, String cf, String cq, byte[] bytes, int len) {
 +    m.put(new Text(cf), new Text(cq), new Value(bytes, 0, len));
 +  }
 +
 +  static class ByteArrayTransport extends TTransport {
 +    TByteArrayOutputStream out = new TByteArrayOutputStream();
 +
 +    @Override
 +    public boolean isOpen() {
 +      return true;
 +    }
 +
 +    @Override
 +    public void open() throws TTransportException {}
 +
 +    @Override
 +    public void close() {}
 +
 +    @Override
 +    public int read(byte[] buf, int off, int len) {
 +      return 0;
 +    }
 +
 +    @Override
 +    public void write(byte[] buf, int off, int len) throws TTransportException {
 +      out.write(buf, off, len);
 +    }
 +
 +    public byte[] get() {
 +      return out.get();
 +    }
 +
 +    public int len() {
 +      return out.len();
 +    }
 +  }
 +
 +  class Receiver implements Iface {
 +    @Override
 +    public void span(RemoteSpan s) throws TException {
 +      String idString = Long.toHexString(s.traceId);
 +      String startString = Long.toHexString(s.start);
 +      Mutation spanMutation = new Mutation(new Text(idString));
 +      Mutation indexMutation = new Mutation(new Text("idx:" + s.svc + ":" + startString));
 +      long diff = s.stop - s.start;
 +      indexMutation.put(new Text(s.description), new Text(s.sender), new Value((idString
+ ":" + Long.toHexString(diff)).getBytes(UTF_8)));
 +      ByteArrayTransport transport = new ByteArrayTransport();
 +      TCompactProtocol protocol = new TCompactProtocol(transport);
 +      s.write(protocol);
 +      String parentString = Long.toHexString(s.parentId);
 +      if (s.parentId == Span.ROOT_SPAN_ID)
 +        parentString = "";
 +      put(spanMutation, "span", parentString + ":" + Long.toHexString(s.spanId), transport.get(),
transport.len());
 +      // Map the root span to time so we can look up traces by time
 +      Mutation timeMutation = null;
 +      if (s.parentId == Span.ROOT_SPAN_ID) {
 +        timeMutation = new Mutation(new Text("start:" + startString));
 +        put(timeMutation, "id", idString, transport.get(), transport.len());
 +      }
 +      try {
 +        final BatchWriter writer = TraceServer.this.writer.get();
 +        /*
 +         * Check for null, because we expect spans to come in much faster than flush calls.
In the case of failure, we'd rather avoid logging tons of NPEs.
 +         */
 +        if (null == writer) {
 +          log.warn("writer is not ready; discarding span.");
 +          return;
 +        }
 +        writer.addMutation(spanMutation);
 +        writer.addMutation(indexMutation);
 +        if (timeMutation != null)
 +          writer.addMutation(timeMutation);
 +      } catch (MutationsRejectedException exception) {
 +        log.warn("Unable to write mutation to table; discarding span. set log level to DEBUG
for span information and stacktrace. cause: " + exception);
 +        if (log.isDebugEnabled()) {
 +          log.debug("discarded span due to rejection of mutation: " + spanMutation, exception);
 +        }
 +        /* XXX this could be e.g. an IllegalArgumentExceptoion if we're trying to write
this mutation to a writer that has been closed since we retrieved it */
 +      } catch (RuntimeException exception) {
 +        log.warn("Unable to write mutation to table; discarding span. set log level to DEBUG
for stacktrace. cause: " + exception);
 +        log.debug("unable to write mutation to table due to exception.", exception);
 +      }
 +    }
 +
 +  }
 +
 +  public TraceServer(ServerConfiguration serverConfiguration, String hostname) throws Exception
{
 +    this.serverConfiguration = serverConfiguration;
 +    AccumuloConfiguration conf = serverConfiguration.getConfiguration();
 +    table = conf.get(Property.TRACE_TABLE);
 +    Connector connector = null;
 +    while (true) {
 +      try {
 +        String principal = conf.get(Property.TRACE_USER);
 +        AuthenticationToken at;
 +        Map<String,String> loginMap = conf.getAllPropertiesWithPrefix(Property.TRACE_TOKEN_PROPERTY_PREFIX);
 +        if (loginMap.isEmpty()) {
 +          Property p = Property.TRACE_PASSWORD;
 +          at = new PasswordToken(conf.get(p).getBytes(UTF_8));
 +        } else {
 +          Properties props = new Properties();
 +          AuthenticationToken token = AccumuloVFSClassLoader.getClassLoader().loadClass(conf.get(Property.TRACE_TOKEN_TYPE)).asSubclass(AuthenticationToken.class)
 +              .newInstance();
 +
 +          int prefixLength = Property.TRACE_TOKEN_PROPERTY_PREFIX.getKey().length();
 +          for (Entry<String,String> entry : loginMap.entrySet()) {
 +            props.put(entry.getKey().substring(prefixLength), entry.getValue());
 +          }
 +
 +          token.init(props);
 +
 +          at = token;
 +        }
 +
 +        connector = serverConfiguration.getInstance().getConnector(principal, at);
 +        if (!connector.tableOperations().exists(table)) {
 +          connector.tableOperations().create(table);
 +          IteratorSetting setting = new IteratorSetting(10, "ageoff", AgeOffFilter.class.getName());
 +          AgeOffFilter.setTTL(setting, 7 * 24 * 60 * 60 * 1000l);
 +          connector.tableOperations().attachIterator(table, setting);
 +        }
 +        connector.tableOperations().setProperty(table, Property.TABLE_FORMATTER_CLASS.getKey(),
TraceFormatter.class.getName());
 +        break;
 +      } catch (Exception ex) {
 +        log.info("Waiting to checking/create the trace table.", ex);
 +        UtilWaitThread.sleep(1000);
 +      }
 +    }
 +    this.connector = connector;
 +    // make sure we refer to the final variable from now on.
 +    connector = null;
 +
 +    int port = conf.getPort(Property.TRACE_PORT);
 +    final ServerSocket sock = ServerSocketChannel.open().socket();
 +    sock.setReuseAddress(true);
 +    sock.bind(new InetSocketAddress(hostname, port));
 +    final TServerTransport transport = new TServerSocket(sock);
 +    TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
 +    options.processor(new Processor<Iface>(new Receiver()));
 +    server = new TThreadPoolServer(options);
 +    registerInZooKeeper(sock.getInetAddress().getHostAddress() + ":" + sock.getLocalPort());
 +    writer = new AtomicReference<BatchWriter>(this.connector.createBatchWriter(table,
new BatchWriterConfig().setMaxLatency(5, TimeUnit.SECONDS)));
 +  }
 +
 +  public void run() throws Exception {
 +    SimpleTimer.getInstance().schedule(new Runnable() {
 +      @Override
 +      public void run() {
 +        flush();
 +      }
 +    }, 1000, 1000);
 +    server.serve();
 +  }
 +
 +  private void flush() {
 +    try {
 +      final BatchWriter writer = this.writer.get();
 +      if (null != writer) {
 +        writer.flush();
 +      } else {
 +        // We don't have a writer. If the table exists, try to make a new writer.
 +        if (connector.tableOperations().exists(table)) {
 +          resetWriter();
 +        }
 +      }
 +    } catch (MutationsRejectedException exception) {
 +      log.warn("Problem flushing traces, resetting writer. Set log level to DEBUG to see
stacktrace. cause: " + exception);
 +      log.debug("flushing traces failed due to exception", exception);
 +      resetWriter();
 +      /* XXX e.g. if the writer was closed between when we grabbed it and when we called
flush. */
 +    } catch (RuntimeException exception) {
 +      log.warn("Problem flushing traces, resetting writer. Set log level to DEBUG to see
stacktrace. cause: " + exception);
 +      log.debug("flushing traces failed due to exception", exception);
 +      resetWriter();
 +    }
 +  }
 +
 +  private void resetWriter() {
 +    BatchWriter writer = null;
 +    try {
 +      writer = connector.createBatchWriter(table, new BatchWriterConfig().setMaxLatency(5,
TimeUnit.SECONDS));
 +    } catch (Exception ex) {
 +      log.warn("Unable to create a batch writer, will retry. Set log level to DEBUG to see
stacktrace. cause: " + ex);
 +      log.debug("batch writer creation failed with exception.", ex);
 +    } finally {
 +      /* Trade in the new writer (even if null) for the one we need to close. */
 +      writer = this.writer.getAndSet(writer);
 +      try {
 +        if (null != writer) {
 +          writer.close();
 +        }
 +      } catch (Exception ex) {
 +        log.warn("Problem closing batch writer. Set log level to DEBUG to see stacktrace.
cause: " + ex);
 +        log.debug("batch writer close failed with exception", ex);
 +      }
 +    }
 +  }
 +
 +  private void registerInZooKeeper(String name) throws Exception {
 +    String root = ZooUtil.getRoot(serverConfiguration.getInstance()) + Constants.ZTRACERS;
 +    IZooReaderWriter zoo = ZooReaderWriter.getInstance();
 +    String path = zoo.putEphemeralSequential(root + "/trace-", name.getBytes(UTF_8));
 +    zoo.exists(path, this);
 +  }
 +
 +  public static void main(String[] args) throws Exception {
 +    SecurityUtil.serverLogin(ServerConfiguration.getSiteConfiguration());
 +    ServerOpts opts = new ServerOpts();
 +    final String app = "tracer";
 +    opts.parseArgs(app, args);
 +    Accumulo.setupLogging(app);
 +    Instance instance = HdfsZooInstance.getInstance();
 +    ServerConfiguration conf = new ServerConfiguration(instance);
 +    VolumeManager fs = VolumeManagerImpl.get();
 +    Accumulo.init(fs, conf, app);
 +    String hostname = opts.getAddress();
 +    TraceServer server = new TraceServer(conf, hostname);
 +    Accumulo.enableTracing(hostname, app);
 +    server.run();
 +    log.info("tracer stopping");
 +  }
 +
 +  @Override
 +  public void process(WatchedEvent event) {
 +    log.debug("event " + event.getPath() + " " + event.getType() + " " + event.getState());
 +    if (event.getState() == KeeperState.Expired) {
 +      log.warn("Trace server lost zookeeper registration at " + event.getPath());
 +      server.stop();
 +    } else if (event.getType() == EventType.NodeDeleted) {
 +      log.warn("Trace server zookeeper entry lost " + event.getPath());
 +      server.stop();
 +    }
 +    if (event.getPath() != null) {
 +      try {
 +        if (ZooReaderWriter.getInstance().exists(event.getPath(), this))
 +          return;
 +      } catch (Exception ex) {
 +        log.error(ex, ex);
 +      }
 +      log.warn("Trace server unable to reset watch on zookeeper registration");
 +      server.stop();
 +    }
 +  }
 +
 +}


Mime
View raw message