accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mwa...@apache.org
Subject [1/3] accumulo-testing git commit: ACCUMULO-4510 Moved continuous ingest code from Accumulo repo
Date Mon, 23 Jan 2017 22:03:56 GMT
Repository: accumulo-testing
Updated Branches:
  refs/heads/master c3c8ff14d -> b81229d78


http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/b81229d7/core/src/main/java/org/apache/accumulo/testing/core/continuous/UndefinedAnalyzer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/UndefinedAnalyzer.java
b/core/src/main/java/org/apache/accumulo/testing/core/continuous/UndefinedAnalyzer.java
new file mode 100644
index 0000000..fe26e02
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/UndefinedAnalyzer.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.continuous;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FilenameFilter;
+import java.io.InputStreamReader;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.cli.BatchScannerOpts;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.impl.Tables;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.server.cli.ClientOnDefaultTable;
+import org.apache.hadoop.io.Text;
+
+import com.beust.jcommander.Parameter;
+
+/**
+ * BUGS This code does not handle the fact that these files could include log events from
previous months. It therefore it assumes all dates are in the current
+ * month. One solution might be to skip log files that haven't been touched in the last month,
but that doesn't prevent newer files that have old dates in them.
+ *
+ */
+public class UndefinedAnalyzer {
+
+  static class UndefinedNode {
+
+    public UndefinedNode(String undef2, String ref2) {
+      this.undef = undef2;
+      this.ref = ref2;
+    }
+
+    String undef;
+    String ref;
+  }
+
+  static class IngestInfo {
+
+    Map<String,TreeMap<Long,Long>> flushes = new HashMap<>();
+
+    public IngestInfo(String logDir) throws Exception {
+      File dir = new File(logDir);
+      File[] ingestLogs = dir.listFiles(new FilenameFilter() {
+        @Override
+        public boolean accept(File dir, String name) {
+          return name.endsWith("ingest.out");
+        }
+      });
+
+      if (ingestLogs != null) {
+        for (File log : ingestLogs) {
+          parseLog(log);
+        }
+      }
+    }
+
+    private void parseLog(File log) throws Exception {
+      BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(log),
UTF_8));
+      String line;
+      TreeMap<Long,Long> tm = null;
+      try {
+        while ((line = reader.readLine()) != null) {
+          if (!line.startsWith("UUID"))
+            continue;
+          String[] tokens = line.split("\\s");
+          String time = tokens[1];
+          String uuid = tokens[2];
+
+          if (flushes.containsKey(uuid)) {
+            System.err.println("WARN Duplicate uuid " + log);
+            return;
+          }
+
+          tm = new TreeMap<>(Collections.reverseOrder());
+          tm.put(0l, Long.parseLong(time));
+          flushes.put(uuid, tm);
+          break;
+
+        }
+        if (tm == null) {
+          System.err.println("WARN Bad ingest log " + log);
+          return;
+        }
+
+        while ((line = reader.readLine()) != null) {
+          String[] tokens = line.split("\\s");
+
+          if (!tokens[0].equals("FLUSH"))
+            continue;
+
+          String time = tokens[1];
+          String count = tokens[4];
+
+          tm.put(Long.parseLong(count), Long.parseLong(time));
+        }
+      } finally {
+        reader.close();
+      }
+    }
+
+    Iterator<Long> getTimes(String uuid, long count) {
+      TreeMap<Long,Long> tm = flushes.get(uuid);
+
+      if (tm == null)
+        return null;
+
+      return tm.tailMap(count).values().iterator();
+    }
+  }
+
+  static class TabletAssignment {
+    String tablet;
+    String endRow;
+    String prevEndRow;
+    String server;
+    long time;
+
+    TabletAssignment(String tablet, String er, String per, String server, long time) {
+      this.tablet = tablet;
+      this.endRow = er;
+      this.prevEndRow = per;
+      this.server = server;
+      this.time = time;
+    }
+
+    public boolean contains(String row) {
+      return prevEndRow.compareTo(row) < 0 && endRow.compareTo(row) >= 0;
+    }
+  }
+
+  static class TabletHistory {
+
+    List<TabletAssignment> assignments = new ArrayList<>();
+
+    TabletHistory(String tableId, String acuLogDir) throws Exception {
+      File dir = new File(acuLogDir);
+      File[] masterLogs = dir.listFiles(new FilenameFilter() {
+        @Override
+        public boolean accept(File dir, String name) {
+          return name.matches("master.*debug.log.*");
+        }
+      });
+
+      SimpleDateFormat sdf = new SimpleDateFormat("dd HH:mm:ss,SSS yyyy MM");
+      String currentYear = (Calendar.getInstance().get(Calendar.YEAR)) + "";
+      String currentMonth = (Calendar.getInstance().get(Calendar.MONTH) + 1) + "";
+
+      if (masterLogs != null) {
+        for (File masterLog : masterLogs) {
+
+          BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(masterLog),
UTF_8));
+          String line;
+          try {
+            while ((line = reader.readLine()) != null) {
+              if (line.contains("TABLET_LOADED")) {
+                String[] tokens = line.split("\\s+");
+                String tablet = tokens[8];
+                String server = tokens[10];
+
+                int pos1 = -1;
+                int pos2 = -1;
+                int pos3 = -1;
+
+                for (int i = 0; i < tablet.length(); i++) {
+                  if (tablet.charAt(i) == '<' || tablet.charAt(i) == ';') {
+                    if (pos1 == -1) {
+                      pos1 = i;
+                    } else if (pos2 == -1) {
+                      pos2 = i;
+                    } else {
+                      pos3 = i;
+                    }
+                  }
+                }
+
+                if (pos1 > 0 && pos2 > 0 && pos3 == -1) {
+                  String tid = tablet.substring(0, pos1);
+                  String endRow = tablet.charAt(pos1) == '<' ? "8000000000000000" : tablet.substring(pos1
+ 1, pos2);
+                  String prevEndRow = tablet.charAt(pos2) == '<' ? "" : tablet.substring(pos2
+ 1);
+                  if (tid.equals(tableId)) {
+                    // System.out.println(" "+server+" "+tid+" "+endRow+" "+prevEndRow);
+                    Date date = sdf.parse(tokens[0] + " " + tokens[1] + " " + currentYear
+ " " + currentMonth);
+                    // System.out.println(" "+date);
+
+                    assignments.add(new TabletAssignment(tablet, endRow, prevEndRow, server,
date.getTime()));
+
+                  }
+                } else if (!tablet.startsWith("!0")) {
+                  System.err.println("Cannot parse tablet " + tablet);
+                }
+
+              }
+            }
+          } finally {
+            reader.close();
+          }
+        }
+      }
+    }
+
+    TabletAssignment findMostRecentAssignment(String row, long time1, long time2) {
+
+      long latest = Long.MIN_VALUE;
+      TabletAssignment ret = null;
+
+      for (TabletAssignment assignment : assignments) {
+        if (assignment.contains(row) && assignment.time <= time2 && assignment.time
> latest) {
+          latest = assignment.time;
+          ret = assignment;
+        }
+      }
+
+      return ret;
+    }
+  }
+
+  static class Opts extends ClientOnDefaultTable {
+    @Parameter(names = "--logdir", description = "directory containing the log files", required
= true)
+    String logDir;
+
+    Opts() {
+      super("ci");
+    }
+  }
+
+  /**
+   * Class to analyze undefined references and accumulo logs to isolate the time/tablet where
data was lost.
+   */
+  public static void main(String[] args) throws Exception {
+    Opts opts = new Opts();
+    BatchScannerOpts bsOpts = new BatchScannerOpts();
+    opts.parseArgs(UndefinedAnalyzer.class.getName(), args, bsOpts);
+
+    List<UndefinedNode> undefs = new ArrayList<>();
+
+    BufferedReader reader = new BufferedReader(new InputStreamReader(System.in, UTF_8));
+    String line;
+    while ((line = reader.readLine()) != null) {
+      String[] tokens = line.split("\\s");
+      String undef = tokens[0];
+      String ref = tokens[1];
+
+      undefs.add(new UndefinedNode(undef, ref));
+    }
+
+    Connector conn = opts.getConnector();
+    BatchScanner bscanner = conn.createBatchScanner(opts.getTableName(), opts.auths, bsOpts.scanThreads);
+    bscanner.setTimeout(bsOpts.scanTimeout, TimeUnit.MILLISECONDS);
+    List<Range> refs = new ArrayList<>();
+
+    for (UndefinedNode undefinedNode : undefs)
+      refs.add(new Range(new Text(undefinedNode.ref)));
+
+    bscanner.setRanges(refs);
+
+    HashMap<String,List<String>> refInfo = new HashMap<>();
+
+    for (Entry<Key,Value> entry : bscanner) {
+      String ref = entry.getKey().getRow().toString();
+      List<String> vals = refInfo.get(ref);
+      if (vals == null) {
+        vals = new ArrayList<>();
+        refInfo.put(ref, vals);
+      }
+
+      vals.add(entry.getValue().toString());
+    }
+
+    bscanner.close();
+
+    IngestInfo ingestInfo = new IngestInfo(opts.logDir);
+    TabletHistory tabletHistory = new TabletHistory(Tables.getTableId(conn.getInstance(),
opts.getTableName()), opts.logDir);
+
+    SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
+
+    for (UndefinedNode undefinedNode : undefs) {
+
+      List<String> refVals = refInfo.get(undefinedNode.ref);
+      if (refVals != null) {
+        for (String refVal : refVals) {
+          TabletAssignment ta = null;
+
+          String[] tokens = refVal.split(":");
+
+          String uuid = tokens[0];
+          String count = tokens[1];
+
+          String t1 = "";
+          String t2 = "";
+
+          Iterator<Long> times = ingestInfo.getTimes(uuid, Long.parseLong(count, 16));
+          if (times != null) {
+            if (times.hasNext()) {
+              long time2 = times.next();
+              t2 = sdf.format(new Date(time2));
+              if (times.hasNext()) {
+                long time1 = times.next();
+                t1 = sdf.format(new Date(time1));
+                ta = tabletHistory.findMostRecentAssignment(undefinedNode.undef, time1, time2);
+              }
+            }
+          }
+
+          if (ta == null)
+            System.out.println(undefinedNode.undef + " " + undefinedNode.ref + " " + uuid
+ " " + t1 + " " + t2);
+          else
+            System.out.println(undefinedNode.undef + " " + undefinedNode.ref + " " + ta.tablet
+ " " + ta.server + " " + uuid + " " + t1 + " " + t2);
+
+        }
+      } else {
+        System.out.println(undefinedNode.undef + " " + undefinedNode.ref);
+      }
+
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/b81229d7/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6c68e5a..c7cbc8e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -38,7 +38,7 @@
   </modules>
 
   <properties>
-    <accumulo.version>1.8.0</accumulo.version>
+    <accumulo.version>2.0.0-SNAPSHOT</accumulo.version>
     <hadoop.version>2.6.4</hadoop.version>
     <zookeeper.version>3.4.9</zookeeper.version>
     <slf4j.version>1.7.21</slf4j.version>


Mime
View raw message