accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [07/50] [abbrv] Merge branch '1.5' into 1.6
Date Sat, 01 Nov 2014 04:57:01 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ShellServlet.java
----------------------------------------------------------------------
diff --cc server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ShellServlet.java
index 665b132,0000000..4975428
mode 100644,000000..100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ShellServlet.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ShellServlet.java
@@@ -1,356 -1,0 +1,357 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.monitor.servlets;
 +
++import static com.google.common.base.Charsets.UTF_8;
++
 +import java.io.IOException;
 +import java.io.InputStream;
 +import java.io.OutputStream;
 +import java.io.OutputStreamWriter;
 +import java.io.PrintWriter;
 +import java.util.HashMap;
 +import java.util.Map;
 +import java.util.UUID;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Executors;
 +
 +import javax.servlet.ServletException;
 +import javax.servlet.http.HttpServletRequest;
 +import javax.servlet.http.HttpServletResponse;
 +import javax.servlet.http.HttpSession;
 +
 +import jline.console.ConsoleReader;
 +
- import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.util.shell.Shell;
 +
 +public class ShellServlet extends BasicServlet {
 +  private static final long serialVersionUID = 1L;
 +  private Map<String,ShellExecutionThread> userShells = new HashMap<String,ShellExecutionThread>();
 +  private ExecutorService service = Executors.newCachedThreadPool();
 +
 +  public static final String CSRF_KEY = "csrf_token";
 +
 +  @Override
 +  protected String getTitle(HttpServletRequest req) {
 +    return "Shell";
 +  }
 +  
 +  @Override
 +  protected void pageBody(HttpServletRequest req, HttpServletResponse response, StringBuilder
sb) throws IOException {
 +    HttpSession session = req.getSession(true);
 +    final String CSRF_TOKEN;
 +    if (null == session.getAttribute(CSRF_KEY)) {
 +      // No token, make one
 +      CSRF_TOKEN = UUID.randomUUID().toString();
 +      session.setAttribute(CSRF_KEY, CSRF_TOKEN);
 +    } else {
 +      // Pull the token out of the session
 +      CSRF_TOKEN = (String) session.getAttribute(CSRF_KEY);
 +      if (null == CSRF_TOKEN) {
 +        throw new RuntimeException("No valid CSRF token exists in session");
 +      }
 +    }
 +
 +    String user = (String) session.getAttribute("user");
 +    if (user == null) {
 +      // user attribute is null, check to see if username and password are passed as parameters
 +      user = req.getParameter("user");
 +      String pass = req.getParameter("pass");
 +      String mock = req.getParameter("mock");
 +      if (user == null || pass == null) {
 +        // username or password are null, re-authenticate
 +        sb.append(authenticationForm(req.getRequestURI(), CSRF_TOKEN));
 +        return;
 +      }
 +      try {
 +        // get a new shell for this user
 +        ShellExecutionThread shellThread = new ShellExecutionThread(user, pass, mock);
 +        service.submit(shellThread);
 +        userShells.put(session.getId(), shellThread);
 +      } catch (IOException e) {
 +        // error validating user, reauthenticate
 +        sb.append("<div id='loginError'>Invalid user/password</div>" + authenticationForm(req.getRequestURI(),
CSRF_TOKEN));
 +        return;
 +      }
 +      session.setAttribute("user", user);
 +    }
 +    if (!userShells.containsKey(session.getId())) {
 +      // no existing shell for this user, re-authenticate
 +      sb.append(authenticationForm(req.getRequestURI(), UUID.randomUUID().toString()));
 +      return;
 +    }
 +
 +    ShellExecutionThread shellThread = userShells.get(session.getId());
 +    shellThread.getOutput();
 +    shellThread.printInfo();
 +    sb.append("<div id='shell'>\n");
 +    sb.append("<pre id='shellResponse'>").append(shellThread.getOutput()).append("</pre>\n");
 +    sb.append("<form><span id='shellPrompt'>").append(shellThread.getPrompt());
 +    sb.append("</span><input type='text' name='cmd' id='cmd' onkeydown='return
handleKeyDown(event.keyCode);'>\n");
 +    sb.append("</form>\n</div>\n");
 +    sb.append("<script type='text/javascript'>\n");
 +    sb.append("var url = '").append(req.getRequestURL().toString()).append("';\n");
 +    sb.append("var xmlhttp = new XMLHttpRequest();\n");
 +    sb.append("var hsize = 1000;\n");
 +    sb.append("var hindex = 0;\n");
 +    sb.append("var history = new Array();\n");
 +    sb.append("\n");
 +    sb.append("function handleKeyDown(keyCode) {\n");
 +    sb.append("  if (keyCode==13) {\n");
 +    sb.append("    submitCmd(document.getElementById('cmd').value);\n");
 +    sb.append("    hindex = history.length;\n");
 +    sb.append("    return false;\n");
 +    sb.append("  } else if (keyCode==38) {\n");
 +    sb.append("    hindex = hindex==0 ? history.length : hindex - 1;\n");
 +    sb.append("    if (hindex == history.length)\n");
 +    sb.append("      document.getElementById('cmd').value = '';\n");
 +    sb.append("    else\n");
 +    sb.append("      document.getElementById('cmd').value = history[hindex];\n");
 +    sb.append("    return false;\n");
 +    sb.append("  } else if (keyCode==40) {\n");
 +    sb.append("    hindex = hindex==history.length ? history.length : hindex + 1;\n");
 +    sb.append("    if (hindex == history.length)\n");
 +    sb.append("      document.getElementById('cmd').value = '';\n");
 +    sb.append("    else\n");
 +    sb.append("      document.getElementById('cmd').value = history[hindex];\n");
 +    sb.append("    return false;\n");
 +    sb.append("  }\n");
 +    sb.append("  return true;\n");
 +    sb.append("}\n");
 +    sb.append("\n");
 +    sb.append("function submitCmd(cmd) {\n");
 +    sb.append("  if (cmd=='history') {\n");
 +    sb.append("    document.getElementById('shellResponse').innerHTML += document.getElementById('shellPrompt').innerHTML+cmd+'\\n';\n");
 +    sb.append("    document.getElementById('shellResponse').innerHTML += history.join('\\n');\n");
 +    sb.append("    return\n");
 +    sb.append("  }\n");
 +    sb.append("  xmlhttp.open('POST',url+'?cmd='+cmd+'&'+'").append(CSRF_KEY).append("=").append(CSRF_TOKEN).append("',false);\n");
 +    sb.append("  xmlhttp.send();\n");
 +    sb.append("  var text = xmlhttp.responseText;\n");
 +    sb.append("  var index = text.lastIndexOf('\\n');\n");
 +    sb.append("  if (index >= 0) {\n");
 +    sb.append("    if (index > 0 && document.getElementById('cmd').type == 'text')
{\n");
 +    sb.append("      if (history.length == hsize)\n");
 +    sb.append("        history.shift()\n");
 +    sb.append("      history.push(cmd)\n");
 +    sb.append("    }\n");
 +    sb.append("    if (text.charAt(text.length-1)=='*') {\n");
 +    sb.append("      document.getElementById('cmd').type = 'password';\n");
 +    sb.append("      text = text.substring(0,xmlhttp.responseText.length-2);\n");
 +    sb.append("    } else {\n");
 +    sb.append("      document.getElementById('cmd').type = 'text';\n");
 +    sb.append("    }\n");
 +    sb.append("    document.getElementById('shellResponse').innerHTML += text.substring(0,index+1);\n");
 +    sb.append("    document.getElementById('shellPrompt').innerHTML = text.substring(index+1);\n");
 +    sb.append("    document.getElementById('cmd').value = '';\n");
 +    sb.append("    document.getElementById('shell').scrollTop = document.getElementById('cmd').offsetTop;\n");
 +    sb.append("  } else {\n");
 +    sb.append("    window.location = url;\n");
 +    sb.append("  }\n");
 +    sb.append("}\n");
 +    sb.append("</script>\n");
 +    sb.append("<script type='text/javascript'>window.onload = function() { document.getElementById('cmd').select();
}</script>\n");
 +  }
 +
 +  @Override
 +  protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException,
IOException {
 +    final HttpSession session = req.getSession(true);
 +    String user = (String) session.getAttribute("user");
 +    if (user == null || !userShells.containsKey(session.getId())) {
 +      // no existing shell for user, re-authenticate
 +      doGet(req, resp);
 +      return;
 +    }
 +    final String CSRF_TOKEN = (String) session.getAttribute(CSRF_KEY);
 +    if (null == CSRF_TOKEN) {
 +      // no csrf token, need to re-auth
 +      doGet(req, resp);
 +    }
 +    ShellExecutionThread shellThread = userShells.get(session.getId());
 +    String cmd = req.getParameter("cmd");
 +    if (cmd == null) {
 +      // the command is null, just print prompt
 +      resp.getWriter().append(shellThread.getPrompt());
 +      resp.getWriter().flush();
 +      return;
 +    }
 +    shellThread.addInputString(cmd);
 +    shellThread.waitUntilReady();
 +    if (shellThread.isDone()) {
 +      // the command was exit, invalidate session
 +      userShells.remove(session.getId());
 +      session.invalidate();
 +      return;
 +    }
 +    // get the shell's output
 +    StringBuilder sb = new StringBuilder();
 +    sb.append(shellThread.getOutput().replace("<", "&lt;").replace(">", "&gt;"));
 +    if (sb.length() == 0 || !(sb.charAt(sb.length() - 1) == '\n'))
 +      sb.append("\n");
 +    // check if shell is waiting for input
 +    if (!shellThread.isWaitingForInput())
 +      sb.append(shellThread.getPrompt());
 +    // check if shell is waiting for password input
 +    if (shellThread.isMasking())
 +      sb.append("*");
 +    resp.getWriter().append(sb.toString());
 +    resp.getWriter().flush();
 +  }
 +
 +  private String authenticationForm(String requestURI, String csrfToken) {
 +    return "<div id='login'><form method=POST action='" + requestURI + "'>"
 +        + "<table><tr><td>Mock:&nbsp</td><td><input
type='checkbox' name='mock' value='mock'></td></tr>"
 +        + "<tr><td>Username:&nbsp;</td><td><input type='text'
name='user'></td></tr>"
 +        + "<tr><td>Password:&nbsp;</td><td><input type='password'
name='pass'></td><td>"
 +        + "<input type='hidden' name='" + CSRF_KEY + "' value='" + csrfToken + "'/><input
type='submit' value='Enter'></td></tr></table></form></div>";
 +  }
 +  
 +  private static class StringBuilderOutputStream extends OutputStream {
 +    StringBuilder sb = new StringBuilder();
 +    
 +    @Override
 +    public void write(int b) throws IOException {
 +      sb.append((char) (0xff & b));
 +    }
 +    
 +    public String get() {
 +      return sb.toString();
 +    }
 +    
 +    public void clear() {
 +      sb.setLength(0);
 +    }
 +  }
 +  
 +  private static class ShellExecutionThread extends InputStream implements Runnable {
 +    private Shell shell;
 +    StringBuilderOutputStream output;
 +    private String cmd;
 +    private int cmdIndex;
 +    private boolean done;
 +    private boolean readWait;
 +    
 +    private ShellExecutionThread(String username, String password, String mock) throws IOException
{
 +      this.done = false;
 +      this.cmd = null;
 +      this.cmdIndex = 0;
 +      this.readWait = false;
 +      this.output = new StringBuilderOutputStream();
 +      ConsoleReader reader = new ConsoleReader(this, output);
-       this.shell = new Shell(reader, new PrintWriter(new OutputStreamWriter(output, Constants.UTF8)));
++      this.shell = new Shell(reader, new PrintWriter(new OutputStreamWriter(output, UTF_8)));
 +      shell.setLogErrorsToConsole();
 +      if (mock != null) {
 +        if (shell.config("--fake", "-u", username, "-p", password))
 +          throw new IOException("mock shell config error");
 +      } else if (shell.config("-u", username, "-p", password)) {
 +        throw new IOException("shell config error");
 +      }
 +    }
 +    
 +    @Override
 +    public synchronized int read() throws IOException {
 +      if (cmd == null) {
 +        readWait = true;
 +        this.notifyAll();
 +      }
 +      while (cmd == null) {
 +        try {
 +          this.wait();
 +        } catch (InterruptedException e) {}
 +      }
 +      readWait = false;
 +      int c;
 +      if (cmdIndex == cmd.length())
 +        c = '\n';
 +      else
 +        c = cmd.charAt(cmdIndex);
 +      cmdIndex++;
 +      if (cmdIndex > cmd.length()) {
 +        cmd = null;
 +        cmdIndex = 0;
 +        this.notifyAll();
 +      }
 +      return c;
 +    }
 +    
 +    @Override
 +    public synchronized void run() {
 +      Thread.currentThread().setName("shell thread");
 +      while (!shell.hasExited()) {
 +        while (cmd == null) {
 +          try {
 +            this.wait();
 +          } catch (InterruptedException e) {}
 +        }
 +        String tcmd = cmd;
 +        cmd = null;
 +        cmdIndex = 0;
 +        try {
 +          shell.execCommand(tcmd, false, true);
 +        } catch (IOException e) {}
 +        this.notifyAll();
 +      }
 +      done = true;
 +      this.notifyAll();
 +    }
 +    
 +    public synchronized void addInputString(String s) {
 +      if (done)
 +        throw new IllegalStateException("adding string to exited shell");
 +      if (cmd == null) {
 +        cmd = s;
 +      } else {
 +        throw new IllegalStateException("adding string to shell not waiting for input");
 +      }
 +      this.notifyAll();
 +    }
 +    
 +    public synchronized void waitUntilReady() {
 +      while (cmd != null) {
 +        try {
 +          this.wait();
 +        } catch (InterruptedException e) {}
 +      }
 +    }
 +    
 +    public synchronized String getOutput() {
 +      String s = output.get();
 +      output.clear();
 +      return s;
 +    }
 +    
 +    public String getPrompt() {
 +      return shell.getDefaultPrompt();
 +    }
 +    
 +    public void printInfo() throws IOException {
 +      shell.printInfo();
 +    }
 +    
 +    public boolean isMasking() {
 +      return shell.isMasking();
 +    }
 +    
 +    public synchronized boolean isWaitingForInput() {
 +      return readWait;
 +    }
 +    
 +    public boolean isDone() {
 +      return done;
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/XMLServlet.java
----------------------------------------------------------------------
diff --cc server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/XMLServlet.java
index 0cbf957,0000000..3cd635e
mode 100644,000000..100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/XMLServlet.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/XMLServlet.java
@@@ -1,179 -1,0 +1,180 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.monitor.servlets;
 +
++import static com.google.common.base.Charsets.UTF_8;
++
 +import java.util.Map.Entry;
 +import java.util.SortedMap;
 +import java.util.TreeMap;
 +
 +import javax.servlet.http.HttpServletRequest;
 +import javax.servlet.http.HttpServletResponse;
 +
- import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.impl.Tables;
 +import org.apache.accumulo.core.master.thrift.Compacting;
 +import org.apache.accumulo.core.master.thrift.DeadServer;
 +import org.apache.accumulo.core.master.thrift.TableInfo;
 +import org.apache.accumulo.core.master.thrift.TabletServerStatus;
 +import org.apache.accumulo.monitor.Monitor;
 +import org.apache.accumulo.monitor.util.celltypes.TServerLinkType;
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +import org.apache.accumulo.server.master.state.TabletServerState;
 +import org.apache.accumulo.server.util.TableInfoUtil;
 +
 +public class XMLServlet extends BasicServlet {
 +  private static final long serialVersionUID = 1L;
 +  
 +  @Override
 +  protected String getTitle(HttpServletRequest req) {
 +    return "XML Report";
 +  }
 +  
 +  @Override
 +  protected void pageStart(HttpServletRequest req, HttpServletResponse resp, StringBuilder
sb) {
-     resp.setContentType("text/xml;charset=" + Constants.UTF8.name());
-     sb.append("<?xml version=\"1.0\" encoding=\"" + Constants.UTF8.name() + "\"?>\n");
++    resp.setContentType("text/xml;charset=" + UTF_8.name());
++    sb.append("<?xml version=\"1.0\" encoding=\"" + UTF_8.name() + "\"?>\n");
 +    sb.append("<stats>\n");
 +  }
 +  
 +  @Override
 +  protected void pageBody(HttpServletRequest req, HttpServletResponse resp, StringBuilder
sb) {
 +    double totalIngest = 0.;
 +    double totalQuery = 0.;
 +    double disk = 0.0;
 +    long totalEntries = 0L;
 +    
 +    sb.append("\n<servers>\n");
 +    if (Monitor.getMmi() == null || Monitor.getMmi().tableMap == null) {
 +      sb.append("</servers>\n");
 +      return;
 +    }
 +    SortedMap<String,TableInfo> tableStats = new TreeMap<String,TableInfo>(Monitor.getMmi().tableMap);
 +    
 +    for (TabletServerStatus status : Monitor.getMmi().tServerInfo) {
 +      
 +      sb.append("\n<server id='").append(status.name).append("'>\n");
 +      sb.append("<hostname>").append(TServerLinkType.displayName(status.name)).append("</hostname>");
 +      sb.append("<lastContact>").append(System.currentTimeMillis() - status.lastContact).append("</lastContact>\n");
 +      sb.append("<osload>").append(status.osLoad).append("</osload>\n");
 +      
 +      TableInfo summary = TableInfoUtil.summarizeTableStats(status);
 +      sb.append("<compactions>\n");
 +      sb.append("<major>").append("<running>").append(summary.majors.running).append("</running>").append("<queued>").append(summary.majors.queued)
 +          .append("</queued>").append("</major>\n");
 +      sb.append("<minor>").append("<running>").append(summary.minors.running).append("</running>").append("<queued>").append(summary.minors.queued)
 +          .append("</queued>").append("</minor>\n");
 +      sb.append("</compactions>\n");
 +      
 +      sb.append("<tablets>").append(summary.tablets).append("</tablets>\n");
 +      
 +      sb.append("<ingest>").append(summary.ingestRate).append("</ingest>\n");
 +      sb.append("<query>").append(summary.queryRate).append("</query>\n");
 +      sb.append("<ingestMB>").append(summary.ingestByteRate / 1000000.0).append("</ingestMB>\n");
 +      sb.append("<queryMB>").append(summary.queryByteRate / 1000000.0).append("</queryMB>\n");
 +      sb.append("<scans>").append(summary.scans.running + summary.scans.queued).append("</scans>");
 +      sb.append("<scansessions>").append(Monitor.getLookupRate()).append("</scansessions>\n");
 +      sb.append("<holdtime>").append(status.holdTime).append("</holdtime>\n");
 +      totalIngest += summary.ingestRate;
 +      totalQuery += summary.queryRate;
 +      totalEntries += summary.recs;
 +      sb.append("</server>\n");
 +    }
 +    sb.append("\n</servers>\n");
 +    
 +    sb.append("\n<masterGoalState>" + Monitor.getMmi().goalState + "</masterGoalState>\n");
 +    sb.append("\n<masterState>" + Monitor.getMmi().state + "</masterState>\n");
 +    
 +    sb.append("\n<badTabletServers>\n");
 +    for (Entry<String,Byte> entry : Monitor.getMmi().badTServers.entrySet()) {
 +      sb.append(String.format("<badTabletServer id='%s' status='%s'/>\n", entry.getKey(),
TabletServerState.getStateById(entry.getValue())));
 +    }
 +    sb.append("\n</badTabletServers>\n");
 +    
 +    sb.append("\n<tabletServersShuttingDown>\n");
 +    for (String server : Monitor.getMmi().serversShuttingDown) {
 +      sb.append(String.format("<server id='%s'/>\n", server));
 +    }
 +    sb.append("\n</tabletServersShuttingDown>\n");
 +    
 +    sb.append(String.format("\n<unassignedTablets>%d</unassignedTablets>\n",
Monitor.getMmi().unassignedTablets));
 +    
 +    sb.append("\n<deadTabletServers>\n");
 +    for (DeadServer dead : Monitor.getMmi().deadTabletServers) {
 +      sb.append(String.format("<deadTabletServer id='%s' lastChange='%d' status='%s'/>\n",
dead.server, dead.lastStatus, dead.status));
 +    }
 +    sb.append("\n</deadTabletServers>\n");
 +    
 +    sb.append("\n<deadLoggers>\n");
 +    for (DeadServer dead : Monitor.getMmi().deadTabletServers) {
 +      sb.append(String.format("<deadLogger id='%s' lastChange='%d' status='%s'/>\n",
dead.server, dead.lastStatus, dead.status));
 +    }
 +    sb.append("\n</deadLoggers>\n");
 +    
 +    sb.append("\n<tables>\n");
 +    Instance instance = HdfsZooInstance.getInstance();
 +    for (Entry<String,TableInfo> entry : tableStats.entrySet()) {
 +      TableInfo tableInfo = entry.getValue();
 +      
 +      sb.append("\n<table>\n");
 +      String tableId = entry.getKey();
 +      String tableName = "unknown";
 +      String tableState = "unknown";
 +      try {
 +        tableName = Tables.getTableName(instance, tableId);
 +        tableState = Tables.getTableState(instance, tableId).toString();
 +      } catch (Exception ex) {
 +        log.warn(ex, ex);
 +      }
 +      sb.append("<tablename>").append(tableName).append("</tablename>\n");
 +      sb.append("<tableId>").append(tableId).append("</tableId>\n");
 +      sb.append("<tableState>").append(tableState).append("</tableState>\n");
 +      sb.append("<tablets>").append(tableInfo.tablets).append("</tablets>\n");
 +      sb.append("<onlineTablets>").append(tableInfo.onlineTablets).append("</onlineTablets>\n");
 +      sb.append("<recs>").append(tableInfo.recs).append("</recs>\n");
 +      sb.append("<recsInMemory>").append(tableInfo.recsInMemory).append("</recsInMemory>\n");
 +      sb.append("<ingest>").append(tableInfo.ingestRate).append("</ingest>\n");
 +      sb.append("<ingestByteRate>").append(tableInfo.ingestByteRate).append("</ingestByteRate>\n");
 +      sb.append("<query>").append(tableInfo.queryRate).append("</query>\n");
 +      sb.append("<queryByteRate>").append(tableInfo.queryRate).append("</queryByteRate>\n");
 +      int running = 0;
 +      int queued = 0;
 +      Compacting compacting = entry.getValue().majors;
 +      if (compacting != null) {
 +        running = compacting.running;
 +        queued = compacting.queued;
 +      }
 +      sb.append("<majorCompactions>").append("<running>").append(running).append("</running>").append("<queued>").append(queued).append("</queued>")
 +          .append("</majorCompactions>\n");
 +      sb.append("</table>\n");
 +    }
 +    sb.append("\n</tables>\n");
 +    
 +    sb.append("\n<totals>\n");
 +    sb.append("<ingestrate>").append(totalIngest).append("</ingestrate>\n");
 +    sb.append("<queryrate>").append(totalQuery).append("</queryrate>\n");
 +    sb.append("<diskrate>").append(disk).append("</diskrate>\n");
 +    sb.append("<numentries>").append(totalEntries).append("</numentries>\n");
 +    sb.append("</totals>\n");
 +  }
 +  
 +  @Override
 +  protected void pageEnd(HttpServletRequest req, HttpServletResponse resp, StringBuilder
sb) {
 +    sb.append("\n</stats>\n");
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/Basic.java
----------------------------------------------------------------------
diff --cc server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/Basic.java
index 90bb0d4,0000000..1c464d4
mode 100644,000000..100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/Basic.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/Basic.java
@@@ -1,105 -1,0 +1,106 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.monitor.servlets.trace;
 +
++import static com.google.common.base.Charsets.UTF_8;
++
 +import java.util.Date;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +
 +import javax.servlet.http.HttpServletRequest;
 +
- import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.Properties;
 +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.trace.TraceFormatter;
 +import org.apache.accumulo.monitor.Monitor;
 +import org.apache.accumulo.monitor.servlets.BasicServlet;
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +
 +abstract class Basic extends BasicServlet {
 +
 +  private static final long serialVersionUID = 1L;
 +
 +  public static String getStringParameter(HttpServletRequest req, String name, String defaultValue)
{
 +    String result = req.getParameter(name);
 +    if (result == null) {
 +      return defaultValue;
 +    }
 +    return result;
 +  }
 +
 +  public static int getIntParameter(HttpServletRequest req, String name, int defaultMinutes)
{
 +    String valueString = req.getParameter(name);
 +    if (valueString == null)
 +      return defaultMinutes;
 +    int result = 0;
 +    try {
 +      result = Integer.parseInt(valueString);
 +    } catch (NumberFormatException ex) {
 +      return defaultMinutes;
 +    }
 +    return result;
 +  }
 +
 +  public static String dateString(long millis) {
 +    return TraceFormatter.formatDate(new Date(millis));
 +  }
 +
 +  protected Scanner getScanner(StringBuilder sb) throws AccumuloException, AccumuloSecurityException
{
 +    AccumuloConfiguration conf = Monitor.getSystemConfiguration();
 +    String principal = conf.get(Property.TRACE_USER);
 +    AuthenticationToken at;
 +    Map<String,String> loginMap = conf.getAllPropertiesWithPrefix(Property.TRACE_TOKEN_PROPERTY_PREFIX);
 +    if (loginMap.isEmpty()) {
 +      Property p = Property.TRACE_PASSWORD;
-       at = new PasswordToken(conf.get(p).getBytes(Constants.UTF8));
++      at = new PasswordToken(conf.get(p).getBytes(UTF_8));
 +    } else {
 +      Properties props = new Properties();
 +      int prefixLength = Property.TRACE_TOKEN_PROPERTY_PREFIX.getKey().length();
 +      for (Entry<String,String> entry : loginMap.entrySet()) {
 +        props.put(entry.getKey().substring(prefixLength), entry.getValue());
 +      }
 +
 +      AuthenticationToken token = Property.createInstanceFromPropertyName(conf, Property.TRACE_TOKEN_TYPE,
AuthenticationToken.class, new PasswordToken());
 +      token.init(props);
 +      at = token;
 +    }
 +
 +    String table = conf.get(Property.TRACE_TABLE);
 +    try {
 +      Connector conn = HdfsZooInstance.getInstance().getConnector(principal, at);
 +      if (!conn.tableOperations().exists(table)) {
 +        return new NullScanner();
 +      }
 +      Scanner scanner = conn.createScanner(table, conn.securityOperations().getUserAuthorizations(principal));
 +      return scanner;
 +    } catch (AccumuloSecurityException ex) {
 +      sb.append("<h2>Unable to read trace table: check trace username and password
configuration.</h2>\n");
 +      return null;
 +    } catch (TableNotFoundException ex) {
 +      return new NullScanner();
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
----------------------------------------------------------------------
diff --cc server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
index e3ae0c8,0000000..6094911
mode 100644,000000..100644
--- a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
+++ b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
@@@ -1,324 -1,0 +1,326 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tracer;
 +
++import static com.google.common.base.Charsets.UTF_8;
++
 +import java.net.InetSocketAddress;
 +import java.net.ServerSocket;
 +import java.nio.channels.ServerSocketChannel;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicReference;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.MutationsRejectedException;
 +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.Properties;
 +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.iterators.user.AgeOffFilter;
 +import org.apache.accumulo.core.security.SecurityUtil;
 +import org.apache.accumulo.core.trace.TraceFormatter;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 +import org.apache.accumulo.server.Accumulo;
 +import org.apache.accumulo.server.ServerOpts;
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +import org.apache.accumulo.server.conf.ServerConfiguration;
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.accumulo.server.fs.VolumeManagerImpl;
 +import org.apache.accumulo.server.util.time.SimpleTimer;
 +import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 +import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
 +import org.apache.accumulo.trace.instrument.Span;
 +import org.apache.accumulo.trace.thrift.RemoteSpan;
 +import org.apache.accumulo.trace.thrift.SpanReceiver.Iface;
 +import org.apache.accumulo.trace.thrift.SpanReceiver.Processor;
 +import org.apache.hadoop.io.Text;
 +import org.apache.log4j.Logger;
 +import org.apache.thrift.TByteArrayOutputStream;
 +import org.apache.thrift.TException;
 +import org.apache.thrift.protocol.TCompactProtocol;
 +import org.apache.thrift.server.TServer;
 +import org.apache.thrift.server.TThreadPoolServer;
 +import org.apache.thrift.transport.TServerSocket;
 +import org.apache.thrift.transport.TServerTransport;
 +import org.apache.thrift.transport.TTransport;
 +import org.apache.thrift.transport.TTransportException;
 +import org.apache.zookeeper.WatchedEvent;
 +import org.apache.zookeeper.Watcher;
 +import org.apache.zookeeper.Watcher.Event.EventType;
 +import org.apache.zookeeper.Watcher.Event.KeeperState;
 +
 +public class TraceServer implements Watcher {
 +
 +  final private static Logger log = Logger.getLogger(TraceServer.class);
 +  final private ServerConfiguration serverConfiguration;
 +  final private TServer server;
 +  final private AtomicReference<BatchWriter> writer;
 +  final private Connector connector;
 +  final String table;
 +
 +  private static void put(Mutation m, String cf, String cq, byte[] bytes, int len) {
 +    m.put(new Text(cf), new Text(cq), new Value(bytes, 0, len));
 +  }
 +
 +  static class ByteArrayTransport extends TTransport {
 +    TByteArrayOutputStream out = new TByteArrayOutputStream();
 +
 +    @Override
 +    public boolean isOpen() {
 +      return true;
 +    }
 +
 +    @Override
 +    public void open() throws TTransportException {}
 +
 +    @Override
 +    public void close() {}
 +
 +    @Override
 +    public int read(byte[] buf, int off, int len) {
 +      return 0;
 +    }
 +
 +    @Override
 +    public void write(byte[] buf, int off, int len) throws TTransportException {
 +      out.write(buf, off, len);
 +    }
 +
 +    public byte[] get() {
 +      return out.get();
 +    }
 +
 +    public int len() {
 +      return out.len();
 +    }
 +  }
 +
 +  class Receiver implements Iface {
 +    @Override
 +    public void span(RemoteSpan s) throws TException {
 +      String idString = Long.toHexString(s.traceId);
 +      String startString = Long.toHexString(s.start);
 +      Mutation spanMutation = new Mutation(new Text(idString));
 +      Mutation indexMutation = new Mutation(new Text("idx:" + s.svc + ":" + startString));
 +      long diff = s.stop - s.start;
-       indexMutation.put(new Text(s.description), new Text(s.sender), new Value((idString
+ ":" + Long.toHexString(diff)).getBytes(Constants.UTF8)));
++      indexMutation.put(new Text(s.description), new Text(s.sender), new Value((idString
+ ":" + Long.toHexString(diff)).getBytes(UTF_8)));
 +      ByteArrayTransport transport = new ByteArrayTransport();
 +      TCompactProtocol protocol = new TCompactProtocol(transport);
 +      s.write(protocol);
 +      String parentString = Long.toHexString(s.parentId);
 +      if (s.parentId == Span.ROOT_SPAN_ID)
 +        parentString = "";
 +      put(spanMutation, "span", parentString + ":" + Long.toHexString(s.spanId), transport.get(),
transport.len());
 +      // Map the root span to time so we can look up traces by time
 +      Mutation timeMutation = null;
 +      if (s.parentId == Span.ROOT_SPAN_ID) {
 +        timeMutation = new Mutation(new Text("start:" + startString));
 +        put(timeMutation, "id", idString, transport.get(), transport.len());
 +      }
 +      try {
 +        final BatchWriter writer = TraceServer.this.writer.get();
 +        /*
 +         * Check for null, because we expect spans to come in much faster than flush calls.
In the case of failure, we'd rather avoid logging tons of NPEs.
 +         */
 +        if (null == writer) {
 +          log.warn("writer is not ready; discarding span.");
 +          return;
 +        }
 +        writer.addMutation(spanMutation);
 +        writer.addMutation(indexMutation);
 +        if (timeMutation != null)
 +          writer.addMutation(timeMutation);
 +      } catch (MutationsRejectedException exception) {
 +        log.warn("Unable to write mutation to table; discarding span. set log level to DEBUG
for span information and stacktrace. cause: " + exception);
 +        if (log.isDebugEnabled()) {
 +          log.debug("discarded span due to rejection of mutation: " + spanMutation, exception);
 +        }
 +        /* XXX this could be e.g. an IllegalArgumentExceptoion if we're trying to write
this mutation to a writer that has been closed since we retrieved it */
 +      } catch (RuntimeException exception) {
 +        log.warn("Unable to write mutation to table; discarding span. set log level to DEBUG
for stacktrace. cause: " + exception);
 +        log.debug("unable to write mutation to table due to exception.", exception);
 +      }
 +    }
 +
 +  }
 +
 +  public TraceServer(ServerConfiguration serverConfiguration, String hostname) throws Exception
{
 +    this.serverConfiguration = serverConfiguration;
 +    AccumuloConfiguration conf = serverConfiguration.getConfiguration();
 +    table = conf.get(Property.TRACE_TABLE);
 +    Connector connector = null;
 +    while (true) {
 +      try {
 +        String principal = conf.get(Property.TRACE_USER);
 +        AuthenticationToken at;
 +        Map<String,String> loginMap = conf.getAllPropertiesWithPrefix(Property.TRACE_TOKEN_PROPERTY_PREFIX);
 +        if (loginMap.isEmpty()) {
 +          Property p = Property.TRACE_PASSWORD;
-           at = new PasswordToken(conf.get(p).getBytes(Constants.UTF8));
++          at = new PasswordToken(conf.get(p).getBytes(UTF_8));
 +        } else {
 +          Properties props = new Properties();
 +          AuthenticationToken token = AccumuloVFSClassLoader.getClassLoader().loadClass(conf.get(Property.TRACE_TOKEN_TYPE)).asSubclass(AuthenticationToken.class)
 +              .newInstance();
 +
 +          int prefixLength = Property.TRACE_TOKEN_PROPERTY_PREFIX.getKey().length();
 +          for (Entry<String,String> entry : loginMap.entrySet()) {
 +            props.put(entry.getKey().substring(prefixLength), entry.getValue());
 +          }
 +
 +          token.init(props);
 +
 +          at = token;
 +        }
 +
 +        connector = serverConfiguration.getInstance().getConnector(principal, at);
 +        if (!connector.tableOperations().exists(table)) {
 +          connector.tableOperations().create(table);
 +          IteratorSetting setting = new IteratorSetting(10, "ageoff", AgeOffFilter.class.getName());
 +          AgeOffFilter.setTTL(setting, 7 * 24 * 60 * 60 * 1000l);
 +          connector.tableOperations().attachIterator(table, setting);
 +        }
 +        connector.tableOperations().setProperty(table, Property.TABLE_FORMATTER_CLASS.getKey(),
TraceFormatter.class.getName());
 +        break;
 +      } catch (Exception ex) {
 +        log.info("Waiting to checking/create the trace table.", ex);
 +        UtilWaitThread.sleep(1000);
 +      }
 +    }
 +    this.connector = connector;
 +    // make sure we refer to the final variable from now on.
 +    connector = null;
 +
 +    int port = conf.getPort(Property.TRACE_PORT);
 +    final ServerSocket sock = ServerSocketChannel.open().socket();
 +    sock.setReuseAddress(true);
 +    sock.bind(new InetSocketAddress(hostname, port));
 +    final TServerTransport transport = new TServerSocket(sock);
 +    TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
 +    options.processor(new Processor<Iface>(new Receiver()));
 +    server = new TThreadPoolServer(options);
 +    registerInZooKeeper(sock.getInetAddress().getHostAddress() + ":" + sock.getLocalPort());
 +    writer = new AtomicReference<BatchWriter>(this.connector.createBatchWriter(table,
new BatchWriterConfig().setMaxLatency(5, TimeUnit.SECONDS)));
 +  }
 +
 +  public void run() throws Exception {
 +    SimpleTimer.getInstance().schedule(new Runnable() {
 +      @Override
 +      public void run() {
 +        flush();
 +      }
 +    }, 1000, 1000);
 +    server.serve();
 +  }
 +
 +  private void flush() {
 +    try {
 +      final BatchWriter writer = this.writer.get();
 +      if (null != writer) {
 +        writer.flush();
 +      }
 +    } catch (MutationsRejectedException exception) {
 +      log.warn("Problem flushing traces, resetting writer. Set log level to DEBUG to see
stacktrace. cause: " + exception);
 +      log.debug("flushing traces failed due to exception", exception);
 +      resetWriter();
 +      /* XXX e.g. if the writer was closed between when we grabbed it and when we called
flush. */
 +    } catch (RuntimeException exception) {
 +      log.warn("Problem flushing traces, resetting writer. Set log level to DEBUG to see
stacktrace. cause: " + exception);
 +      log.debug("flushing traces failed due to exception", exception);
 +      resetWriter();
 +    }
 +  }
 +
 +  private void resetWriter() {
 +    BatchWriter writer = null;
 +    try {
 +      writer = connector.createBatchWriter(table, new BatchWriterConfig().setMaxLatency(5,
TimeUnit.SECONDS));
 +    } catch (Exception ex) {
 +      log.warn("Unable to create a batch writer, will retry. Set log level to DEBUG to see
stacktrace. cause: " + ex);
 +      log.debug("batch writer creation failed with exception.", ex);
 +    } finally {
 +      /* Trade in the new writer (even if null) for the one we need to close. */
 +      writer = this.writer.getAndSet(writer);
 +      try {
 +        if (null != writer) {
 +          writer.close();
 +        }
 +      } catch (Exception ex) {
 +        log.warn("Problem closing batch writer. Set log level to DEBUG to see stacktrace.
cause: " + ex);
 +        log.debug("batch writer close failed with exception", ex);
 +      }
 +    }
 +  }
 +
 +  private void registerInZooKeeper(String name) throws Exception {
 +    String root = ZooUtil.getRoot(serverConfiguration.getInstance()) + Constants.ZTRACERS;
 +    IZooReaderWriter zoo = ZooReaderWriter.getInstance();
-     String path = zoo.putEphemeralSequential(root + "/trace-", name.getBytes(Constants.UTF8));
++    String path = zoo.putEphemeralSequential(root + "/trace-", name.getBytes(UTF_8));
 +    zoo.exists(path, this);
 +  }
 +
 +  public static void main(String[] args) throws Exception {
 +    SecurityUtil.serverLogin(ServerConfiguration.getSiteConfiguration());
 +    ServerOpts opts = new ServerOpts();
 +    final String app = "tracer";
 +    opts.parseArgs(app, args);
 +    Accumulo.setupLogging(app);
 +    Instance instance = HdfsZooInstance.getInstance();
 +    ServerConfiguration conf = new ServerConfiguration(instance);
 +    VolumeManager fs = VolumeManagerImpl.get();
 +    Accumulo.init(fs, conf, app);
 +    String hostname = opts.getAddress();
 +    TraceServer server = new TraceServer(conf, hostname);
 +    Accumulo.enableTracing(hostname, app);
 +    server.run();
 +    log.info("tracer stopping");
 +  }
 +
 +  @Override
 +  public void process(WatchedEvent event) {
 +    log.debug("event " + event.getPath() + " " + event.getType() + " " + event.getState());
 +    if (event.getState() == KeeperState.Expired) {
 +      log.warn("Trace server lost zookeeper registration at " + event.getPath());
 +      server.stop();
 +    } else if (event.getType() == EventType.NodeDeleted) {
 +      log.warn("Trace server zookeeper entry lost " + event.getPath());
 +      server.stop();
 +    }
 +    if (event.getPath() != null) {
 +      try {
 +        if (ZooReaderWriter.getInstance().exists(event.getPath(), this))
 +          return;
 +      } catch (Exception ex) {
 +        log.error(ex, ex);
 +      }
 +      log.warn("Trace server unable to reset watch on zookeeper registration");
 +      server.stop();
 +    }
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java
index 39bf81e,0000000..4d9a6c2
mode 100644,000000..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java
@@@ -1,75 -1,0 +1,76 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tserver;
 +
++import static com.google.common.base.Charsets.UTF_8;
++
 +import java.io.IOException;
 +
- import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.util.CachedConfiguration;
 +import org.apache.accumulo.server.conf.ServerConfiguration;
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.accumulo.server.fs.VolumeManagerImpl;
 +import org.apache.accumulo.server.trace.TraceFileSystem;
 +import org.apache.accumulo.server.zookeeper.DistributedWorkQueue.Processor;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.FileUtil;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.log4j.Logger;
 +
 +/**
 + * Copy failed bulk imports.
 + */
 +public class BulkFailedCopyProcessor implements Processor {
 +
 +  private static final Logger log = Logger.getLogger(BulkFailedCopyProcessor.class);
 +
 +  @Override
 +  public Processor newProcessor() {
 +    return new BulkFailedCopyProcessor();
 +  }
 +
 +  @Override
 +  public void process(String workID, byte[] data) {
 +
-     String paths[] = new String(data, Constants.UTF8).split(",");
++    String paths[] = new String(data, UTF_8).split(",");
 +
 +    Path orig = new Path(paths[0]);
 +    Path dest = new Path(paths[1]);
 +    Path tmp = new Path(dest.getParent(), dest.getName() + ".tmp");
 +
 +    try {
 +      VolumeManager vm = VolumeManagerImpl.get(ServerConfiguration.getSiteConfiguration());
 +      FileSystem origFs = TraceFileSystem.wrap(vm.getVolumeByPath(orig).getFileSystem());
 +      FileSystem destFs = TraceFileSystem.wrap(vm.getVolumeByPath(dest).getFileSystem());
 +      
 +      FileUtil.copy(origFs, orig, destFs, tmp, false, true, CachedConfiguration.getInstance());
 +      destFs.rename(tmp, dest);
 +      log.debug("copied " + orig + " to " + dest);
 +    } catch (IOException ex) {
 +      try {
 +        VolumeManager vm = VolumeManagerImpl.get(ServerConfiguration.getSiteConfiguration());
 +        FileSystem destFs = TraceFileSystem.wrap(vm.getVolumeByPath(dest).getFileSystem());
 +        destFs.create(dest).close();
 +        log.warn(" marked " + dest + " failed", ex);
 +      } catch (IOException e) {
 +        log.error("Unable to create failure flag file " + dest, e);
 +      }
 +    }
 +
 +  }
 +
 +}


Mime
View raw message