Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EF87310EC6 for ; Thu, 23 Jan 2014 07:36:27 +0000 (UTC) Received: (qmail 13436 invoked by uid 500); 23 Jan 2014 07:36:27 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 13343 invoked by uid 500); 23 Jan 2014 07:36:26 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 13161 invoked by uid 99); 23 Jan 2014 07:36:20 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 23 Jan 2014 07:36:20 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id BEF428AC4F0; Thu, 23 Jan 2014 07:36:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: busbey@apache.org To: commits@accumulo.apache.org Date: Thu, 23 Jan 2014 07:36:20 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [02/23] git commit: ACCUMULO-2213 Make writer recovery in the Tracer more robust. ACCUMULO-2213 Make writer recovery in the Tracer more robust. * Cleans up writer reseting in the TraceServer, avoids overly broad catching. * tones down log levels in TraceServer to WARN because trace information is transient and we retry everything. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/dfafd9c1 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/dfafd9c1 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/dfafd9c1 Branch: refs/heads/1.5.1-SNAPSHOT Commit: dfafd9c1104d8359ca1eabe345661c541766d57f Parents: 57f9b6c Author: Sean Busbey Authored: Tue Jan 21 14:05:56 2014 -0600 Committer: Sean Busbey Committed: Wed Jan 22 16:36:34 2014 -0600 ---------------------------------------------------------------------- .../accumulo/server/trace/TraceServer.java | 68 ++++++++++++++------ 1 file changed, 49 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/dfafd9c1/src/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java ---------------------------------------------------------------------- diff --git a/src/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java b/src/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java index 4d89e9c..0e2010a 100644 --- a/src/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java +++ b/src/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java @@ -19,6 +19,7 @@ package org.apache.accumulo.server.trace; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.channels.ServerSocketChannel; +import java.util.concurrent.atomic.AtomicReference; import java.util.TimerTask; import org.apache.accumulo.cloudtrace.instrument.Span; @@ -27,6 +28,7 @@ import org.apache.accumulo.cloudtrace.thrift.SpanReceiver; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Mutation; @@ -64,8 +66,8 @@ public class TraceServer implements Watcher { final private static Logger log = Logger.getLogger(TraceServer.class); final private AccumuloConfiguration conf; final private TServer server; - private BatchWriter writer = null; - private Connector connector; + final private AtomicReference writer; + final private Connector connector; final String table; private static void put(Mutation m, String cf, String cq, byte[] bytes, int len) { @@ -128,16 +130,27 @@ public class TraceServer implements Watcher { put(timeMutation, "id", idString, transport.get(), transport.len()); } try { - if (writer == null) - resetWriter(); - if (writer == null) + final BatchWriter writer = TraceServer.this.writer.get(); + /* Check for null, because we expect spans to come in much faster than flush calls. + In the case of failure, we'd rather avoid logging tons of NPEs. + */ + if (null == writer) { + log.warn("writer is not ready; discarding span."); return; + } writer.addMutation(spanMutation); writer.addMutation(indexMutation); if (timeMutation != null) writer.addMutation(timeMutation); - } catch (Exception ex) { - log.error("Unable to write mutation to table: " + spanMutation, ex); + } catch (MutationsRejectedException exception) { + log.warn("Unable to write mutation to table; discarding span. set log level to DEBUG for span information and stacktrace. cause: " + exception); + if (log.isDebugEnabled()) { + log.debug("discarded span due to rejection of mutation: " + spanMutation, exception); + } + /* XXX this could be e.g. an IllegalArgumentExceptoion if we're trying to write this mutation to a writer that has been closed since we retrieved it */ + } catch (RuntimeException exception) { + log.warn("Unable to write mutation to table; discarding span. set log level to DEBUG for stacktrace. cause: " + exception); + log.debug("unable to write mutation to table due to exception.", exception); } } @@ -147,6 +160,7 @@ public class TraceServer implements Watcher { Accumulo.init("tracer"); conf = ServerConfiguration.getSystemConfiguration(); table = conf.get(Property.TRACE_TABLE); + Connector connector = null; while (true) { try { connector = HdfsZooInstance.getInstance().getConnector(conf.get(Property.TRACE_USER), conf.get(Property.TRACE_PASSWORD).getBytes()); @@ -160,6 +174,9 @@ public class TraceServer implements Watcher { UtilWaitThread.sleep(1000); } } + this.connector = connector; + // make sure we refer to the final variable from now on. + connector = null; int port = conf.getPort(Property.TRACE_PORT); final ServerSocket sock = ServerSocketChannel.open().socket(); @@ -171,8 +188,7 @@ public class TraceServer implements Watcher { server = new TThreadPoolServer(options); final InetSocketAddress address = new InetSocketAddress(Accumulo.getLocalAddress(args), sock.getLocalPort()); registerInZooKeeper(AddressUtil.toString(address)); - - writer = connector.createBatchWriter(table, 100l * 1024 * 1024, 5 * 1000l, 10); + writer = new AtomicReference(this.connector.createBatchWriter(table, 100l * 1024 * 1024, 5 * 1000l, 10)); } public void run() throws Exception { @@ -187,25 +203,39 @@ public class TraceServer implements Watcher { private void flush() { try { - writer.flush(); - } catch (Exception e) { - log.error("Error flushing traces", e); + final BatchWriter writer = this.writer.get(); + if (null != writer) { + writer.flush(); + } + } catch (MutationsRejectedException exception) { + log.warn("Problem flushing traces, resetting writer. Set log level to DEBUG to see stacktrace. cause: " + exception); + log.debug("flushing traces failed due to exception", exception); + resetWriter(); + /* XXX e.g. if the writer was closed between when we grabbed it and when we called flush. */ + } catch (RuntimeException exception) { + log.warn("Problem flushing traces, resetting writer. Set log level to DEBUG to see stacktrace. cause: " + exception); + log.debug("flushing traces failed due to exception", exception); resetWriter(); } } - synchronized private void resetWriter() { + private void resetWriter() { + BatchWriter writer = null; try { - if (writer != null) - writer.close(); + writer = connector.createBatchWriter(table, 100l * 1024 * 1024, 5 * 1000l, 10); } catch (Exception ex) { - log.error("Error closing batch writer", ex); + log.warn("Unable to create a batch writer, will retry. Set log level to DEBUG to see stacktrace. cause: " + ex); + log.debug("batch writer creation failed with exception.", ex); } finally { - writer = null; + /* Trade in the new writer (even if null) for the one we need to close. */ + writer = this.writer.getAndSet(writer); try { - writer = connector.createBatchWriter(table, 100l * 1024 * 1024, 5 * 1000l, 10); + if (null != writer) { + writer.close(); + } } catch (Exception ex) { - log.error("Unable to create a batch writer: " + ex); + log.warn("Problem closing batch writer. Set log level to DEBUG to see stacktrace. cause: " + ex); + log.debug("batch writer close failed with exception", ex); } } }