accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [2/5] accumulo git commit: ACCUMULO-3351 Try to reset the writer if we don't have one but the table exists.
Date Fri, 21 Nov 2014 04:04:18 GMT
ACCUMULO-3351 Try to reset the writer if we don't have one but the table exists.

We have to avoid the case where we spin indefinitely trying to process
traces (but cannot because we don't have a writer). If we don't have
a writer, but the trace table exists, try to get a new writer.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/32c18d59
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/32c18d59
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/32c18d59

Branch: refs/heads/master
Commit: 32c18d59ffe3581134d6a0e6e83494fb979ee931
Parents: d470f05
Author: Josh Elser <elserj@apache.org>
Authored: Thu Nov 20 15:57:59 2014 -0500
Committer: Josh Elser <elserj@apache.org>
Committed: Thu Nov 20 15:57:59 2014 -0500

----------------------------------------------------------------------
 .../org/apache/accumulo/tracer/TraceServer.java |   5 +
 .../test/TracerRecoversAfterOfflineTableIT.java | 128 +++++++++++++++++++
 2 files changed, 133 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/32c18d59/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
----------------------------------------------------------------------
diff --git a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
index 6094911..70e8d7d 100644
--- a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
+++ b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
@@ -243,6 +243,11 @@ public class TraceServer implements Watcher {
       final BatchWriter writer = this.writer.get();
       if (null != writer) {
         writer.flush();
+      } else {
+        // We don't have a writer. If the table exists, try to make a new writer.
+        if (connector.tableOperations().exists(table)) {
+          resetWriter();
+        }
       }
     } catch (MutationsRejectedException exception) {
       log.warn("Problem flushing traces, resetting writer. Set log level to DEBUG to see
stacktrace. cause: " + exception);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/32c18d59/test/src/test/java/org/apache/accumulo/test/TracerRecoversAfterOfflineTableIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/TracerRecoversAfterOfflineTableIT.java
b/test/src/test/java/org/apache/accumulo/test/TracerRecoversAfterOfflineTableIT.java
new file mode 100644
index 0000000..3b32150
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/TracerRecoversAfterOfflineTableIT.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.trace.DistributedTrace;
+import org.apache.accumulo.core.trace.TraceDump;
+import org.apache.accumulo.core.trace.TraceDump.Printer;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.fate.zookeeper.ZooReader;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.accumulo.trace.instrument.Span;
+import org.apache.accumulo.trace.instrument.Trace;
+import org.apache.accumulo.tracer.TraceServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class TracerRecoversAfterOfflineTableIT extends ConfigurableMacIT {
+
+  @Override
+  public void configure(MiniAccumuloConfigImpl cfg, Configuration coreSite) {
+    cfg.setNumTservers(1);
+  }
+
+  @Override
+  public int defaultTimeoutSeconds() {
+    return 60;
+  }
+
+  @Test
+  public void test() throws Exception {
+    Process tracer = null;
+    Connector conn = getConnector();
+    if (!conn.tableOperations().exists("trace")) {
+      MiniAccumuloClusterImpl mac = cluster;
+      tracer = mac.exec(TraceServer.class);
+      while (!conn.tableOperations().exists("trace")) {
+        UtilWaitThread.sleep(1000);
+      }
+    }
+
+    log.info("Deleting trace table records and taking table offline");
+    conn.tableOperations().deleteRows("trace", null, null);
+    conn.tableOperations().offline("trace", true);
+
+    String tableName = getUniqueNames(1)[0];
+    conn.tableOperations().create(tableName);
+
+    log.info("Start a distributed trace span");
+
+    DistributedTrace.enable(conn.getInstance(), new ZooReader(conn.getInstance().getZooKeepers(),
30 * 1000), "testTrace", "localhost");
+    Span root = Trace.on("traceTest");
+    BatchWriter bw = conn.createBatchWriter(tableName, null);
+    Mutation m = new Mutation("m");
+    m.put("a", "b", "c");
+    bw.addMutation(m);
+    bw.close();
+    root.stop();
+
+    log.info("Bringing trace table back online");
+    conn.tableOperations().online("trace", true);
+
+    log.info("Trace table is online, should be able to find trace");
+
+    final Scanner scanner = conn.createScanner("trace", Authorizations.EMPTY);
+    scanner.setRange(new Range(new Text(Long.toHexString(root.traceId()))));
+    while (true) {
+      final StringBuffer finalBuffer = new StringBuffer();
+      int traceCount = TraceDump.printTrace(scanner, new Printer() {
+        @Override
+        public void print(final String line) {
+          try {
+            finalBuffer.append(line).append("\n");
+          } catch (Exception ex) {
+            throw new RuntimeException(ex);
+          }
+        }
+      });
+      String traceOutput = finalBuffer.toString();
+      log.info("Trace output:" + traceOutput);
+      if (traceCount > 0) {
+        int lastPos = 0;
+        for (String part : "traceTest,close,binMutations".split(",")) {
+          log.info("Looking in trace output for '" + part + "'");
+          int pos = traceOutput.indexOf(part);
+          assertTrue("Did not find '" + part + "' in output", pos > 0);
+          assertTrue("'" + part + "' occurred earlier than the previous element unexpectedly",
pos > lastPos);
+          lastPos = pos;
+        }
+        break;
+      } else {
+        log.info("Ignoring trace output as traceCount not greater than zero: " + traceCount);
+        Thread.sleep(1000);
+      }
+    }
+    if (tracer != null) {
+      tracer.destroy();
+    }
+  }
+
+}


Mime
View raw message