Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E71ED177A9 for ; Sat, 1 Nov 2014 04:56:57 +0000 (UTC) Received: (qmail 82234 invoked by uid 500); 1 Nov 2014 04:56:57 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 82197 invoked by uid 500); 1 Nov 2014 04:56:57 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 82188 invoked by uid 99); 1 Nov 2014 04:56:57 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 01 Nov 2014 04:56:57 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 5A59899362F; Sat, 1 Nov 2014 04:56:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ctubbsii@apache.org To: commits@accumulo.apache.org Date: Sat, 01 Nov 2014 04:57:01 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [07/50] [abbrv] Merge branch '1.5' into 1.6 http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ShellServlet.java ---------------------------------------------------------------------- diff --cc server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ShellServlet.java index 665b132,0000000..4975428 mode 100644,000000..100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ShellServlet.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ShellServlet.java @@@ -1,356 -1,0 +1,357 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.monitor.servlets; + ++import static com.google.common.base.Charsets.UTF_8; ++ +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.servlet.http.HttpSession; + +import jline.console.ConsoleReader; + - import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.util.shell.Shell; + +public class ShellServlet extends BasicServlet { + private static final long serialVersionUID = 1L; + private Map userShells = new HashMap(); + private ExecutorService service = Executors.newCachedThreadPool(); + + public static final String CSRF_KEY = "csrf_token"; + + @Override + protected String getTitle(HttpServletRequest req) { + return "Shell"; + } + + @Override + protected void pageBody(HttpServletRequest req, HttpServletResponse response, StringBuilder sb) throws IOException { + HttpSession session = req.getSession(true); + final String CSRF_TOKEN; + if (null == session.getAttribute(CSRF_KEY)) { + // No token, make one + CSRF_TOKEN = UUID.randomUUID().toString(); + session.setAttribute(CSRF_KEY, CSRF_TOKEN); + } else { + // Pull the token out of the session + CSRF_TOKEN = (String) session.getAttribute(CSRF_KEY); + if (null == CSRF_TOKEN) { + throw new RuntimeException("No valid CSRF token exists in session"); + } + } + + String user = (String) session.getAttribute("user"); + if (user == null) { + // user attribute is null, check to see if username and password are passed as parameters + user = req.getParameter("user"); + String pass = req.getParameter("pass"); + String mock = req.getParameter("mock"); + if (user == null || pass == null) { + // username or password are null, re-authenticate + sb.append(authenticationForm(req.getRequestURI(), CSRF_TOKEN)); + return; + } + try { + // get a new shell for this user + ShellExecutionThread shellThread = new ShellExecutionThread(user, pass, mock); + service.submit(shellThread); + userShells.put(session.getId(), shellThread); + } catch (IOException e) { + // error validating user, reauthenticate + sb.append("
Invalid user/password
" + authenticationForm(req.getRequestURI(), CSRF_TOKEN)); + return; + } + session.setAttribute("user", user); + } + if (!userShells.containsKey(session.getId())) { + // no existing shell for this user, re-authenticate + sb.append(authenticationForm(req.getRequestURI(), UUID.randomUUID().toString())); + return; + } + + ShellExecutionThread shellThread = userShells.get(session.getId()); + shellThread.getOutput(); + shellThread.printInfo(); + sb.append("
\n"); + sb.append("
").append(shellThread.getOutput()).append("
\n"); + sb.append("
").append(shellThread.getPrompt()); + sb.append("\n"); + sb.append("
\n
\n"); + sb.append("\n"); + sb.append("\n"); + } + + @Override + protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + final HttpSession session = req.getSession(true); + String user = (String) session.getAttribute("user"); + if (user == null || !userShells.containsKey(session.getId())) { + // no existing shell for user, re-authenticate + doGet(req, resp); + return; + } + final String CSRF_TOKEN = (String) session.getAttribute(CSRF_KEY); + if (null == CSRF_TOKEN) { + // no csrf token, need to re-auth + doGet(req, resp); + } + ShellExecutionThread shellThread = userShells.get(session.getId()); + String cmd = req.getParameter("cmd"); + if (cmd == null) { + // the command is null, just print prompt + resp.getWriter().append(shellThread.getPrompt()); + resp.getWriter().flush(); + return; + } + shellThread.addInputString(cmd); + shellThread.waitUntilReady(); + if (shellThread.isDone()) { + // the command was exit, invalidate session + userShells.remove(session.getId()); + session.invalidate(); + return; + } + // get the shell's output + StringBuilder sb = new StringBuilder(); + sb.append(shellThread.getOutput().replace("<", "<").replace(">", ">")); + if (sb.length() == 0 || !(sb.charAt(sb.length() - 1) == '\n')) + sb.append("\n"); + // check if shell is waiting for input + if (!shellThread.isWaitingForInput()) + sb.append(shellThread.getPrompt()); + // check if shell is waiting for password input + if (shellThread.isMasking()) + sb.append("*"); + resp.getWriter().append(sb.toString()); + resp.getWriter().flush(); + } + + private String authenticationForm(String requestURI, String csrfToken) { + return "
" + + "" + + "" + + "
Mock: 
Username: 
Password: " + + "
"; + } + + private static class StringBuilderOutputStream extends OutputStream { + StringBuilder sb = new StringBuilder(); + + @Override + public void write(int b) throws IOException { + sb.append((char) (0xff & b)); + } + + public String get() { + return sb.toString(); + } + + public void clear() { + sb.setLength(0); + } + } + + private static class ShellExecutionThread extends InputStream implements Runnable { + private Shell shell; + StringBuilderOutputStream output; + private String cmd; + private int cmdIndex; + private boolean done; + private boolean readWait; + + private ShellExecutionThread(String username, String password, String mock) throws IOException { + this.done = false; + this.cmd = null; + this.cmdIndex = 0; + this.readWait = false; + this.output = new StringBuilderOutputStream(); + ConsoleReader reader = new ConsoleReader(this, output); - this.shell = new Shell(reader, new PrintWriter(new OutputStreamWriter(output, Constants.UTF8))); ++ this.shell = new Shell(reader, new PrintWriter(new OutputStreamWriter(output, UTF_8))); + shell.setLogErrorsToConsole(); + if (mock != null) { + if (shell.config("--fake", "-u", username, "-p", password)) + throw new IOException("mock shell config error"); + } else if (shell.config("-u", username, "-p", password)) { + throw new IOException("shell config error"); + } + } + + @Override + public synchronized int read() throws IOException { + if (cmd == null) { + readWait = true; + this.notifyAll(); + } + while (cmd == null) { + try { + this.wait(); + } catch (InterruptedException e) {} + } + readWait = false; + int c; + if (cmdIndex == cmd.length()) + c = '\n'; + else + c = cmd.charAt(cmdIndex); + cmdIndex++; + if (cmdIndex > cmd.length()) { + cmd = null; + cmdIndex = 0; + this.notifyAll(); + } + return c; + } + + @Override + public synchronized void run() { + Thread.currentThread().setName("shell thread"); + while (!shell.hasExited()) { + while (cmd == null) { + try { + this.wait(); + } catch (InterruptedException e) {} + } + String tcmd = cmd; + cmd = null; + cmdIndex = 0; + try { + shell.execCommand(tcmd, false, true); + } catch (IOException e) {} + this.notifyAll(); + } + done = true; + this.notifyAll(); + } + + public synchronized void addInputString(String s) { + if (done) + throw new IllegalStateException("adding string to exited shell"); + if (cmd == null) { + cmd = s; + } else { + throw new IllegalStateException("adding string to shell not waiting for input"); + } + this.notifyAll(); + } + + public synchronized void waitUntilReady() { + while (cmd != null) { + try { + this.wait(); + } catch (InterruptedException e) {} + } + } + + public synchronized String getOutput() { + String s = output.get(); + output.clear(); + return s; + } + + public String getPrompt() { + return shell.getDefaultPrompt(); + } + + public void printInfo() throws IOException { + shell.printInfo(); + } + + public boolean isMasking() { + return shell.isMasking(); + } + + public synchronized boolean isWaitingForInput() { + return readWait; + } + + public boolean isDone() { + return done; + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/XMLServlet.java ---------------------------------------------------------------------- diff --cc server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/XMLServlet.java index 0cbf957,0000000..3cd635e mode 100644,000000..100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/XMLServlet.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/XMLServlet.java @@@ -1,179 -1,0 +1,180 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.monitor.servlets; + ++import static com.google.common.base.Charsets.UTF_8; ++ +import java.util.Map.Entry; +import java.util.SortedMap; +import java.util.TreeMap; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + - import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.impl.Tables; +import org.apache.accumulo.core.master.thrift.Compacting; +import org.apache.accumulo.core.master.thrift.DeadServer; +import org.apache.accumulo.core.master.thrift.TableInfo; +import org.apache.accumulo.core.master.thrift.TabletServerStatus; +import org.apache.accumulo.monitor.Monitor; +import org.apache.accumulo.monitor.util.celltypes.TServerLinkType; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.master.state.TabletServerState; +import org.apache.accumulo.server.util.TableInfoUtil; + +public class XMLServlet extends BasicServlet { + private static final long serialVersionUID = 1L; + + @Override + protected String getTitle(HttpServletRequest req) { + return "XML Report"; + } + + @Override + protected void pageStart(HttpServletRequest req, HttpServletResponse resp, StringBuilder sb) { - resp.setContentType("text/xml;charset=" + Constants.UTF8.name()); - sb.append("\n"); ++ resp.setContentType("text/xml;charset=" + UTF_8.name()); ++ sb.append("\n"); + sb.append("\n"); + } + + @Override + protected void pageBody(HttpServletRequest req, HttpServletResponse resp, StringBuilder sb) { + double totalIngest = 0.; + double totalQuery = 0.; + double disk = 0.0; + long totalEntries = 0L; + + sb.append("\n\n"); + if (Monitor.getMmi() == null || Monitor.getMmi().tableMap == null) { + sb.append("\n"); + return; + } + SortedMap tableStats = new TreeMap(Monitor.getMmi().tableMap); + + for (TabletServerStatus status : Monitor.getMmi().tServerInfo) { + + sb.append("\n\n"); + sb.append("").append(TServerLinkType.displayName(status.name)).append(""); + sb.append("").append(System.currentTimeMillis() - status.lastContact).append("\n"); + sb.append("").append(status.osLoad).append("\n"); + + TableInfo summary = TableInfoUtil.summarizeTableStats(status); + sb.append("\n"); + sb.append("").append("").append(summary.majors.running).append("").append("").append(summary.majors.queued) + .append("").append("\n"); + sb.append("").append("").append(summary.minors.running).append("").append("").append(summary.minors.queued) + .append("").append("\n"); + sb.append("\n"); + + sb.append("").append(summary.tablets).append("\n"); + + sb.append("").append(summary.ingestRate).append("\n"); + sb.append("").append(summary.queryRate).append("\n"); + sb.append("").append(summary.ingestByteRate / 1000000.0).append("\n"); + sb.append("").append(summary.queryByteRate / 1000000.0).append("\n"); + sb.append("").append(summary.scans.running + summary.scans.queued).append(""); + sb.append("").append(Monitor.getLookupRate()).append("\n"); + sb.append("").append(status.holdTime).append("\n"); + totalIngest += summary.ingestRate; + totalQuery += summary.queryRate; + totalEntries += summary.recs; + sb.append("\n"); + } + sb.append("\n\n"); + + sb.append("\n" + Monitor.getMmi().goalState + "\n"); + sb.append("\n" + Monitor.getMmi().state + "\n"); + + sb.append("\n\n"); + for (Entry entry : Monitor.getMmi().badTServers.entrySet()) { + sb.append(String.format("\n", entry.getKey(), TabletServerState.getStateById(entry.getValue()))); + } + sb.append("\n\n"); + + sb.append("\n\n"); + for (String server : Monitor.getMmi().serversShuttingDown) { + sb.append(String.format("\n", server)); + } + sb.append("\n\n"); + + sb.append(String.format("\n%d\n", Monitor.getMmi().unassignedTablets)); + + sb.append("\n\n"); + for (DeadServer dead : Monitor.getMmi().deadTabletServers) { + sb.append(String.format("\n", dead.server, dead.lastStatus, dead.status)); + } + sb.append("\n\n"); + + sb.append("\n\n"); + for (DeadServer dead : Monitor.getMmi().deadTabletServers) { + sb.append(String.format("\n", dead.server, dead.lastStatus, dead.status)); + } + sb.append("\n\n"); + + sb.append("\n\n"); + Instance instance = HdfsZooInstance.getInstance(); + for (Entry entry : tableStats.entrySet()) { + TableInfo tableInfo = entry.getValue(); + + sb.append("\n\n"); + String tableId = entry.getKey(); + String tableName = "unknown"; + String tableState = "unknown"; + try { + tableName = Tables.getTableName(instance, tableId); + tableState = Tables.getTableState(instance, tableId).toString(); + } catch (Exception ex) { + log.warn(ex, ex); + } + sb.append("").append(tableName).append("\n"); + sb.append("").append(tableId).append("\n"); + sb.append("").append(tableState).append("\n"); + sb.append("").append(tableInfo.tablets).append("\n"); + sb.append("").append(tableInfo.onlineTablets).append("\n"); + sb.append("").append(tableInfo.recs).append("\n"); + sb.append("").append(tableInfo.recsInMemory).append("\n"); + sb.append("").append(tableInfo.ingestRate).append("\n"); + sb.append("").append(tableInfo.ingestByteRate).append("\n"); + sb.append("").append(tableInfo.queryRate).append("\n"); + sb.append("").append(tableInfo.queryRate).append("\n"); + int running = 0; + int queued = 0; + Compacting compacting = entry.getValue().majors; + if (compacting != null) { + running = compacting.running; + queued = compacting.queued; + } + sb.append("").append("").append(running).append("").append("").append(queued).append("") + .append("\n"); + sb.append("
\n"); + } + sb.append("\n
\n"); + + sb.append("\n\n"); + sb.append("").append(totalIngest).append("\n"); + sb.append("").append(totalQuery).append("\n"); + sb.append("").append(disk).append("\n"); + sb.append("").append(totalEntries).append("\n"); + sb.append("\n"); + } + + @Override + protected void pageEnd(HttpServletRequest req, HttpServletResponse resp, StringBuilder sb) { + sb.append("\n
\n"); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/Basic.java ---------------------------------------------------------------------- diff --cc server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/Basic.java index 90bb0d4,0000000..1c464d4 mode 100644,000000..100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/Basic.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/Basic.java @@@ -1,105 -1,0 +1,106 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.monitor.servlets.trace; + ++import static com.google.common.base.Charsets.UTF_8; ++ +import java.util.Date; +import java.util.Map; +import java.util.Map.Entry; + +import javax.servlet.http.HttpServletRequest; + - import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.Properties; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.trace.TraceFormatter; +import org.apache.accumulo.monitor.Monitor; +import org.apache.accumulo.monitor.servlets.BasicServlet; +import org.apache.accumulo.server.client.HdfsZooInstance; + +abstract class Basic extends BasicServlet { + + private static final long serialVersionUID = 1L; + + public static String getStringParameter(HttpServletRequest req, String name, String defaultValue) { + String result = req.getParameter(name); + if (result == null) { + return defaultValue; + } + return result; + } + + public static int getIntParameter(HttpServletRequest req, String name, int defaultMinutes) { + String valueString = req.getParameter(name); + if (valueString == null) + return defaultMinutes; + int result = 0; + try { + result = Integer.parseInt(valueString); + } catch (NumberFormatException ex) { + return defaultMinutes; + } + return result; + } + + public static String dateString(long millis) { + return TraceFormatter.formatDate(new Date(millis)); + } + + protected Scanner getScanner(StringBuilder sb) throws AccumuloException, AccumuloSecurityException { + AccumuloConfiguration conf = Monitor.getSystemConfiguration(); + String principal = conf.get(Property.TRACE_USER); + AuthenticationToken at; + Map loginMap = conf.getAllPropertiesWithPrefix(Property.TRACE_TOKEN_PROPERTY_PREFIX); + if (loginMap.isEmpty()) { + Property p = Property.TRACE_PASSWORD; - at = new PasswordToken(conf.get(p).getBytes(Constants.UTF8)); ++ at = new PasswordToken(conf.get(p).getBytes(UTF_8)); + } else { + Properties props = new Properties(); + int prefixLength = Property.TRACE_TOKEN_PROPERTY_PREFIX.getKey().length(); + for (Entry entry : loginMap.entrySet()) { + props.put(entry.getKey().substring(prefixLength), entry.getValue()); + } + + AuthenticationToken token = Property.createInstanceFromPropertyName(conf, Property.TRACE_TOKEN_TYPE, AuthenticationToken.class, new PasswordToken()); + token.init(props); + at = token; + } + + String table = conf.get(Property.TRACE_TABLE); + try { + Connector conn = HdfsZooInstance.getInstance().getConnector(principal, at); + if (!conn.tableOperations().exists(table)) { + return new NullScanner(); + } + Scanner scanner = conn.createScanner(table, conn.securityOperations().getUserAuthorizations(principal)); + return scanner; + } catch (AccumuloSecurityException ex) { + sb.append("

Unable to read trace table: check trace username and password configuration.

\n"); + return null; + } catch (TableNotFoundException ex) { + return new NullScanner(); + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java ---------------------------------------------------------------------- diff --cc server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java index e3ae0c8,0000000..6094911 mode 100644,000000..100644 --- a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java +++ b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java @@@ -1,324 -1,0 +1,326 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tracer; + ++import static com.google.common.base.Charsets.UTF_8; ++ +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.nio.channels.ServerSocketChannel; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.Properties; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.user.AgeOffFilter; +import org.apache.accumulo.core.security.SecurityUtil; +import org.apache.accumulo.core.trace.TraceFormatter; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; +import org.apache.accumulo.server.Accumulo; +import org.apache.accumulo.server.ServerOpts; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.conf.ServerConfiguration; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.fs.VolumeManagerImpl; +import org.apache.accumulo.server.util.time.SimpleTimer; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader; +import org.apache.accumulo.trace.instrument.Span; +import org.apache.accumulo.trace.thrift.RemoteSpan; +import org.apache.accumulo.trace.thrift.SpanReceiver.Iface; +import org.apache.accumulo.trace.thrift.SpanReceiver.Processor; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; +import org.apache.thrift.TByteArrayOutputStream; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.server.TServer; +import org.apache.thrift.server.TThreadPoolServer; +import org.apache.thrift.transport.TServerSocket; +import org.apache.thrift.transport.TServerTransport; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; + +public class TraceServer implements Watcher { + + final private static Logger log = Logger.getLogger(TraceServer.class); + final private ServerConfiguration serverConfiguration; + final private TServer server; + final private AtomicReference writer; + final private Connector connector; + final String table; + + private static void put(Mutation m, String cf, String cq, byte[] bytes, int len) { + m.put(new Text(cf), new Text(cq), new Value(bytes, 0, len)); + } + + static class ByteArrayTransport extends TTransport { + TByteArrayOutputStream out = new TByteArrayOutputStream(); + + @Override + public boolean isOpen() { + return true; + } + + @Override + public void open() throws TTransportException {} + + @Override + public void close() {} + + @Override + public int read(byte[] buf, int off, int len) { + return 0; + } + + @Override + public void write(byte[] buf, int off, int len) throws TTransportException { + out.write(buf, off, len); + } + + public byte[] get() { + return out.get(); + } + + public int len() { + return out.len(); + } + } + + class Receiver implements Iface { + @Override + public void span(RemoteSpan s) throws TException { + String idString = Long.toHexString(s.traceId); + String startString = Long.toHexString(s.start); + Mutation spanMutation = new Mutation(new Text(idString)); + Mutation indexMutation = new Mutation(new Text("idx:" + s.svc + ":" + startString)); + long diff = s.stop - s.start; - indexMutation.put(new Text(s.description), new Text(s.sender), new Value((idString + ":" + Long.toHexString(diff)).getBytes(Constants.UTF8))); ++ indexMutation.put(new Text(s.description), new Text(s.sender), new Value((idString + ":" + Long.toHexString(diff)).getBytes(UTF_8))); + ByteArrayTransport transport = new ByteArrayTransport(); + TCompactProtocol protocol = new TCompactProtocol(transport); + s.write(protocol); + String parentString = Long.toHexString(s.parentId); + if (s.parentId == Span.ROOT_SPAN_ID) + parentString = ""; + put(spanMutation, "span", parentString + ":" + Long.toHexString(s.spanId), transport.get(), transport.len()); + // Map the root span to time so we can look up traces by time + Mutation timeMutation = null; + if (s.parentId == Span.ROOT_SPAN_ID) { + timeMutation = new Mutation(new Text("start:" + startString)); + put(timeMutation, "id", idString, transport.get(), transport.len()); + } + try { + final BatchWriter writer = TraceServer.this.writer.get(); + /* + * Check for null, because we expect spans to come in much faster than flush calls. In the case of failure, we'd rather avoid logging tons of NPEs. + */ + if (null == writer) { + log.warn("writer is not ready; discarding span."); + return; + } + writer.addMutation(spanMutation); + writer.addMutation(indexMutation); + if (timeMutation != null) + writer.addMutation(timeMutation); + } catch (MutationsRejectedException exception) { + log.warn("Unable to write mutation to table; discarding span. set log level to DEBUG for span information and stacktrace. cause: " + exception); + if (log.isDebugEnabled()) { + log.debug("discarded span due to rejection of mutation: " + spanMutation, exception); + } + /* XXX this could be e.g. an IllegalArgumentExceptoion if we're trying to write this mutation to a writer that has been closed since we retrieved it */ + } catch (RuntimeException exception) { + log.warn("Unable to write mutation to table; discarding span. set log level to DEBUG for stacktrace. cause: " + exception); + log.debug("unable to write mutation to table due to exception.", exception); + } + } + + } + + public TraceServer(ServerConfiguration serverConfiguration, String hostname) throws Exception { + this.serverConfiguration = serverConfiguration; + AccumuloConfiguration conf = serverConfiguration.getConfiguration(); + table = conf.get(Property.TRACE_TABLE); + Connector connector = null; + while (true) { + try { + String principal = conf.get(Property.TRACE_USER); + AuthenticationToken at; + Map loginMap = conf.getAllPropertiesWithPrefix(Property.TRACE_TOKEN_PROPERTY_PREFIX); + if (loginMap.isEmpty()) { + Property p = Property.TRACE_PASSWORD; - at = new PasswordToken(conf.get(p).getBytes(Constants.UTF8)); ++ at = new PasswordToken(conf.get(p).getBytes(UTF_8)); + } else { + Properties props = new Properties(); + AuthenticationToken token = AccumuloVFSClassLoader.getClassLoader().loadClass(conf.get(Property.TRACE_TOKEN_TYPE)).asSubclass(AuthenticationToken.class) + .newInstance(); + + int prefixLength = Property.TRACE_TOKEN_PROPERTY_PREFIX.getKey().length(); + for (Entry entry : loginMap.entrySet()) { + props.put(entry.getKey().substring(prefixLength), entry.getValue()); + } + + token.init(props); + + at = token; + } + + connector = serverConfiguration.getInstance().getConnector(principal, at); + if (!connector.tableOperations().exists(table)) { + connector.tableOperations().create(table); + IteratorSetting setting = new IteratorSetting(10, "ageoff", AgeOffFilter.class.getName()); + AgeOffFilter.setTTL(setting, 7 * 24 * 60 * 60 * 1000l); + connector.tableOperations().attachIterator(table, setting); + } + connector.tableOperations().setProperty(table, Property.TABLE_FORMATTER_CLASS.getKey(), TraceFormatter.class.getName()); + break; + } catch (Exception ex) { + log.info("Waiting to checking/create the trace table.", ex); + UtilWaitThread.sleep(1000); + } + } + this.connector = connector; + // make sure we refer to the final variable from now on. + connector = null; + + int port = conf.getPort(Property.TRACE_PORT); + final ServerSocket sock = ServerSocketChannel.open().socket(); + sock.setReuseAddress(true); + sock.bind(new InetSocketAddress(hostname, port)); + final TServerTransport transport = new TServerSocket(sock); + TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport); + options.processor(new Processor(new Receiver())); + server = new TThreadPoolServer(options); + registerInZooKeeper(sock.getInetAddress().getHostAddress() + ":" + sock.getLocalPort()); + writer = new AtomicReference(this.connector.createBatchWriter(table, new BatchWriterConfig().setMaxLatency(5, TimeUnit.SECONDS))); + } + + public void run() throws Exception { + SimpleTimer.getInstance().schedule(new Runnable() { + @Override + public void run() { + flush(); + } + }, 1000, 1000); + server.serve(); + } + + private void flush() { + try { + final BatchWriter writer = this.writer.get(); + if (null != writer) { + writer.flush(); + } + } catch (MutationsRejectedException exception) { + log.warn("Problem flushing traces, resetting writer. Set log level to DEBUG to see stacktrace. cause: " + exception); + log.debug("flushing traces failed due to exception", exception); + resetWriter(); + /* XXX e.g. if the writer was closed between when we grabbed it and when we called flush. */ + } catch (RuntimeException exception) { + log.warn("Problem flushing traces, resetting writer. Set log level to DEBUG to see stacktrace. cause: " + exception); + log.debug("flushing traces failed due to exception", exception); + resetWriter(); + } + } + + private void resetWriter() { + BatchWriter writer = null; + try { + writer = connector.createBatchWriter(table, new BatchWriterConfig().setMaxLatency(5, TimeUnit.SECONDS)); + } catch (Exception ex) { + log.warn("Unable to create a batch writer, will retry. Set log level to DEBUG to see stacktrace. cause: " + ex); + log.debug("batch writer creation failed with exception.", ex); + } finally { + /* Trade in the new writer (even if null) for the one we need to close. */ + writer = this.writer.getAndSet(writer); + try { + if (null != writer) { + writer.close(); + } + } catch (Exception ex) { + log.warn("Problem closing batch writer. Set log level to DEBUG to see stacktrace. cause: " + ex); + log.debug("batch writer close failed with exception", ex); + } + } + } + + private void registerInZooKeeper(String name) throws Exception { + String root = ZooUtil.getRoot(serverConfiguration.getInstance()) + Constants.ZTRACERS; + IZooReaderWriter zoo = ZooReaderWriter.getInstance(); - String path = zoo.putEphemeralSequential(root + "/trace-", name.getBytes(Constants.UTF8)); ++ String path = zoo.putEphemeralSequential(root + "/trace-", name.getBytes(UTF_8)); + zoo.exists(path, this); + } + + public static void main(String[] args) throws Exception { + SecurityUtil.serverLogin(ServerConfiguration.getSiteConfiguration()); + ServerOpts opts = new ServerOpts(); + final String app = "tracer"; + opts.parseArgs(app, args); + Accumulo.setupLogging(app); + Instance instance = HdfsZooInstance.getInstance(); + ServerConfiguration conf = new ServerConfiguration(instance); + VolumeManager fs = VolumeManagerImpl.get(); + Accumulo.init(fs, conf, app); + String hostname = opts.getAddress(); + TraceServer server = new TraceServer(conf, hostname); + Accumulo.enableTracing(hostname, app); + server.run(); + log.info("tracer stopping"); + } + + @Override + public void process(WatchedEvent event) { + log.debug("event " + event.getPath() + " " + event.getType() + " " + event.getState()); + if (event.getState() == KeeperState.Expired) { + log.warn("Trace server lost zookeeper registration at " + event.getPath()); + server.stop(); + } else if (event.getType() == EventType.NodeDeleted) { + log.warn("Trace server zookeeper entry lost " + event.getPath()); + server.stop(); + } + if (event.getPath() != null) { + try { + if (ZooReaderWriter.getInstance().exists(event.getPath(), this)) + return; + } catch (Exception ex) { + log.error(ex, ex); + } + log.warn("Trace server unable to reset watch on zookeeper registration"); + server.stop(); + } + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java ---------------------------------------------------------------------- diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java index 39bf81e,0000000..4d9a6c2 mode 100644,000000..100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java @@@ -1,75 -1,0 +1,76 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver; + ++import static com.google.common.base.Charsets.UTF_8; ++ +import java.io.IOException; + - import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.accumulo.server.conf.ServerConfiguration; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.fs.VolumeManagerImpl; +import org.apache.accumulo.server.trace.TraceFileSystem; +import org.apache.accumulo.server.zookeeper.DistributedWorkQueue.Processor; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.Logger; + +/** + * Copy failed bulk imports. + */ +public class BulkFailedCopyProcessor implements Processor { + + private static final Logger log = Logger.getLogger(BulkFailedCopyProcessor.class); + + @Override + public Processor newProcessor() { + return new BulkFailedCopyProcessor(); + } + + @Override + public void process(String workID, byte[] data) { + - String paths[] = new String(data, Constants.UTF8).split(","); ++ String paths[] = new String(data, UTF_8).split(","); + + Path orig = new Path(paths[0]); + Path dest = new Path(paths[1]); + Path tmp = new Path(dest.getParent(), dest.getName() + ".tmp"); + + try { + VolumeManager vm = VolumeManagerImpl.get(ServerConfiguration.getSiteConfiguration()); + FileSystem origFs = TraceFileSystem.wrap(vm.getVolumeByPath(orig).getFileSystem()); + FileSystem destFs = TraceFileSystem.wrap(vm.getVolumeByPath(dest).getFileSystem()); + + FileUtil.copy(origFs, orig, destFs, tmp, false, true, CachedConfiguration.getInstance()); + destFs.rename(tmp, dest); + log.debug("copied " + orig + " to " + dest); + } catch (IOException ex) { + try { + VolumeManager vm = VolumeManagerImpl.get(ServerConfiguration.getSiteConfiguration()); + FileSystem destFs = TraceFileSystem.wrap(vm.getVolumeByPath(dest).getFileSystem()); + destFs.create(dest).close(); + log.warn(" marked " + dest + " failed", ex); + } catch (IOException e) { + log.error("Unable to create failure flag file " + dest, e); + } + } + + } + +}