accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jma...@apache.org
Subject [accumulo] branch main updated: Update LogReader to utilize RecoveryLogsIterator (#2181)
Date Wed, 14 Jul 2021 12:16:01 GMT
This is an automated email from the ASF dual-hosted git repository.

jmanno pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new f9f1d3f  Update LogReader to utilize RecoveryLogsIterator (#2181)
f9f1d3f is described below

commit f9f1d3f6578c400862e9f0e8d82a49bf9c4a5392
Author: Jeffrey Manno <jeffreymanno15@gmail.com>
AuthorDate: Wed Jul 14 08:15:52 2021 -0400

    Update LogReader to utilize RecoveryLogsIterator (#2181)
    
    * Adds utilization of RecoveryLogsIterator to read sorted Rfiles inside LogReader.java
    * Removed old implementation of RecoveryLogReader and removed associated test
    * Added unit test for RecoveryLogsIterator, RecoveryLogsIteratorTest
    
    Co-authored-by: Christopher Tubbs <ctubbsii@apache.org>
---
 .../accumulo/tserver/log/RecoveryLogReader.java    | 326 ---------------------
 .../accumulo/tserver/log/RecoveryLogsIterator.java |   4 +-
 .../apache/accumulo/tserver/logger/LogReader.java  |  31 +-
 .../tserver/log/RecoveryLogsIteratorTest.java      | 251 ++++++++++++++++
 .../tserver/log/RecoveryLogsReaderTest.java        | 216 --------------
 5 files changed, 273 insertions(+), 555 deletions(-)

diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogReader.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogReader.java
deleted file mode 100644
index bde5a1c..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogReader.java
+++ /dev/null
@@ -1,326 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.tserver.log;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.util.AbstractMap;
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.Objects;
-import java.util.PriorityQueue;
-
-import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.log.SortedLogState;
-import org.apache.accumulo.tserver.logger.LogEvents;
-import org.apache.accumulo.tserver.logger.LogFileKey;
-import org.apache.accumulo.tserver.logger.LogFileValue;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.MapFile.Reader;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.PeekingIterator;
-
-/**
- * A class which reads sorted recovery logs produced from a single WAL.
- *
- * Presently only supports next() and seek() and works on all the Map directories within
a
- * directory. The primary purpose of this class is to merge the results of multiple Reduce
jobs that
- * result in Map output files.
- */
-public class RecoveryLogReader implements CloseableIterator<Entry<LogFileKey,LogFileValue>>
{
-
-  /**
-   * Group together the next key/value from a Reader with the Reader
-   */
-  private static class Index implements Comparable<Index> {
-    Reader reader;
-    WritableComparable<?> key;
-    Writable value;
-    boolean cached = false;
-
-    private static Object create(java.lang.Class<?> klass) {
-      try {
-        return klass.getConstructor().newInstance();
-      } catch (Exception t) {
-        throw new RuntimeException("Unable to construct objects to use for comparison");
-      }
-    }
-
-    public Index(Reader reader) {
-      this.reader = reader;
-      key = (WritableComparable<?>) create(reader.getKeyClass());
-      value = (Writable) create(reader.getValueClass());
-    }
-
-    private void cache() throws IOException {
-      if (!cached && reader.next(key, value)) {
-        cached = true;
-      }
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hashCode(key);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      return this == obj || (obj != null && obj instanceof Index && compareTo((Index)
obj) == 0);
-    }
-
-    @Override
-    public int compareTo(Index o) {
-      try {
-        cache();
-        o.cache();
-        // no more data: always goes to the end
-        if (!cached)
-          return 1;
-        if (!o.cached)
-          return -1;
-        @SuppressWarnings({"unchecked", "rawtypes"})
-        int result = ((WritableComparable) key).compareTo(o.key);
-        return result;
-      } catch (IOException ex) {
-        throw new UncheckedIOException(ex);
-      }
-    }
-  }
-
-  private PriorityQueue<Index> heap = new PriorityQueue<>();
-  private Iterator<Entry<LogFileKey,LogFileValue>> iter;
-
-  public RecoveryLogReader(VolumeManager fs, Path directory) throws IOException {
-    this(fs, directory, null, null);
-  }
-
-  public RecoveryLogReader(VolumeManager fs, Path directory, LogFileKey start, LogFileKey
end)
-      throws IOException {
-    boolean foundFinish = false;
-    for (FileStatus child : fs.listStatus(directory)) {
-      if (child.getPath().getName().startsWith("_"))
-        continue;
-      if (SortedLogState.isFinished(child.getPath().getName())) {
-        foundFinish = true;
-        continue;
-      }
-      if (SortedLogState.FAILED.getMarker().equals(child.getPath().getName())) {
-        continue;
-      }
-      FileSystem ns = fs.getFileSystemByPath(child.getPath());
-      heap.add(new Index(new Reader(ns.makeQualified(child.getPath()), ns.getConf())));
-    }
-    if (!foundFinish)
-      throw new IOException(
-          "Sort '" + SortedLogState.FINISHED.getMarker() + "' flag not found in " + directory);
-
-    iter = new SortCheckIterator(new RangeIterator(start, end));
-  }
-
-  private static void copy(Writable src, Writable dest) throws IOException {
-    // not exactly efficient...
-    DataOutputBuffer output = new DataOutputBuffer();
-    src.write(output);
-    DataInputBuffer input = new DataInputBuffer();
-    input.reset(output.getData(), output.getLength());
-    dest.readFields(input);
-  }
-
-  @VisibleForTesting
-  synchronized boolean next(WritableComparable<?> key, Writable val) throws IOException
{
-    Index elt = heap.remove();
-    try {
-      elt.cache();
-      if (elt.cached) {
-        copy(elt.key, key);
-        copy(elt.value, val);
-        elt.cached = false;
-      } else {
-        return false;
-      }
-    } finally {
-      heap.add(elt);
-    }
-    return true;
-  }
-
-  @VisibleForTesting
-  synchronized boolean seek(WritableComparable<?> key) throws IOException {
-    PriorityQueue<Index> reheap = new PriorityQueue<>(heap.size());
-    boolean result = false;
-    for (Index index : heap) {
-      try {
-        WritableComparable<?> found = index.reader.getClosest(key, index.value, true);
-        if (found != null && found.equals(key)) {
-          result = true;
-        }
-      } catch (EOFException ex) {
-        // thrown if key is beyond all data in the map
-      }
-      index.cached = false;
-      reheap.add(index);
-    }
-    heap = reheap;
-    return result;
-  }
-
-  @Override
-  public void close() throws IOException {
-    IOException problem = null;
-    for (Index index : heap) {
-      try {
-        index.reader.close();
-      } catch (IOException ex) {
-        problem = ex;
-      }
-    }
-    if (problem != null)
-      throw problem;
-    heap = null;
-  }
-
-  /**
-   * Ensures source iterator provides data in sorted order
-   */
-  @VisibleForTesting
-  static class SortCheckIterator implements Iterator<Entry<LogFileKey,LogFileValue>>
{
-
-    private PeekingIterator<Entry<LogFileKey,LogFileValue>> source;
-
-    SortCheckIterator(Iterator<Entry<LogFileKey,LogFileValue>> source) {
-      this.source = Iterators.peekingIterator(source);
-
-    }
-
-    @Override
-    public boolean hasNext() {
-      return source.hasNext();
-    }
-
-    @Override
-    public Entry<LogFileKey,LogFileValue> next() {
-      Entry<LogFileKey,LogFileValue> next = source.next();
-      if (source.hasNext()) {
-        Preconditions.checkState(next.getKey().compareTo(source.peek().getKey()) <= 0,
-            "Keys not in order %s %s", next.getKey(), source.peek().getKey());
-      }
-      return next;
-    }
-
-    @Override
-    public void remove() {
-      throw new UnsupportedOperationException("remove");
-    }
-  }
-
-  private class RangeIterator implements Iterator<Entry<LogFileKey,LogFileValue>>
{
-
-    private LogFileKey key = new LogFileKey();
-    private LogFileValue value = new LogFileValue();
-    private boolean hasNext;
-    private LogFileKey end;
-
-    private boolean next(LogFileKey key, LogFileValue value) throws IOException {
-      try {
-        return RecoveryLogReader.this.next(key, value);
-      } catch (EOFException e) {
-        return false;
-      }
-    }
-
-    RangeIterator(LogFileKey start, LogFileKey end) throws IOException {
-      this.end = end;
-
-      if (start != null) {
-        hasNext = next(key, value);
-
-        if (hasNext && key.event != LogEvents.OPEN) {
-          throw new IllegalStateException("First log entry value is not OPEN");
-        }
-
-        seek(start);
-      }
-
-      hasNext = next(key, value);
-
-      if (hasNext && start != null && key.compareTo(start) < 0) {
-        throw new IllegalStateException("First key is less than start " + key + " " + start);
-      }
-
-      if (hasNext && end != null && key.compareTo(end) > 0) {
-        hasNext = false;
-      }
-    }
-
-    @Override
-    public boolean hasNext() {
-      return hasNext;
-    }
-
-    @Override
-    public Entry<LogFileKey,LogFileValue> next() {
-      Preconditions.checkState(hasNext);
-      Entry<LogFileKey,LogFileValue> entry = new AbstractMap.SimpleImmutableEntry<>(key,
value);
-
-      key = new LogFileKey();
-      value = new LogFileValue();
-      try {
-        hasNext = next(key, value);
-        if (hasNext && end != null && key.compareTo(end) > 0) {
-          hasNext = false;
-        }
-      } catch (IOException e) {
-        throw new IllegalStateException(e);
-      }
-
-      return entry;
-    }
-
-    @Override
-    public void remove() {
-      throw new UnsupportedOperationException("remove");
-    }
-  }
-
-  @Override
-  public boolean hasNext() {
-    return iter.hasNext();
-  }
-
-  @Override
-  public Entry<LogFileKey,LogFileValue> next() {
-    return iter.next();
-  }
-
-  @Override
-  public void remove() {
-    throw new UnsupportedOperationException("remove");
-  }
-
-}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
index 0f5e259..75999d7 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
@@ -59,12 +59,12 @@ public class RecoveryLogsIterator
   /**
    * Scans the files in each recoveryLogDir over the range [start,end].
    */
-  RecoveryLogsIterator(ServerContext context, List<Path> recoveryLogDirs, LogFileKey
start,
+  public RecoveryLogsIterator(ServerContext context, List<Path> recoveryLogDirs, LogFileKey
start,
       LogFileKey end, boolean checkFirstKey) throws IOException {
 
     List<Iterator<Entry<Key,Value>>> iterators = new ArrayList<>(recoveryLogDirs.size());
     scanners = new ArrayList<>();
-    Range range = LogFileKey.toRange(start, end);
+    Range range = start == null ? null : LogFileKey.toRange(start, end);
     var vm = context.getVolumeManager();
 
     for (Path logDir : recoveryLogDirs) {
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
index 5c0ce29..a984418 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
@@ -23,6 +23,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import java.io.DataInputStream;
 import java.io.EOFException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map.Entry;
@@ -35,11 +36,13 @@ import org.apache.accumulo.core.conf.SiteConfiguration;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.start.spi.KeywordExecutable;
 import org.apache.accumulo.tserver.log.DfsLogger;
 import org.apache.accumulo.tserver.log.DfsLogger.LogHeaderIncompleteException;
-import org.apache.accumulo.tserver.log.RecoveryLogReader;
+import org.apache.accumulo.tserver.log.RecoveryLogsIterator;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
@@ -72,7 +75,7 @@ public class LogReader implements KeywordExecutable {
   }
 
   /**
-   * Dump a Log File (Map or Sequence) to stdout. Will read from HDFS or local file system.
+   * Dump a Log File to stdout. Will read from HDFS or local file system.
    *
    * @param args
    *          - first argument is the file to print
@@ -103,7 +106,8 @@ public class LogReader implements KeywordExecutable {
     }
 
     var siteConfig = SiteConfiguration.auto();
-    try (var fs = VolumeManagerImpl.get(siteConfig, new Configuration())) {
+    ServerContext context = new ServerContext(siteConfig);
+    try (VolumeManager fs = VolumeManagerImpl.get(siteConfig, new Configuration())) {
 
       Matcher rowMatcher = null;
       KeyExtent ke = null;
@@ -123,13 +127,18 @@ public class LogReader implements KeywordExecutable {
       Set<Integer> tabletIds = new HashSet<>();
 
       for (String file : opts.files) {
-
         Path path = new Path(file);
         LogFileKey key = new LogFileKey();
         LogFileValue value = new LogFileValue();
 
+        // ensure it's a regular non-sorted WAL file, and not a single sorted WAL in RFile
format
         if (fs.getFileStatus(path).isFile()) {
-          // read log entries from a simple hdfs file
+          if (file.endsWith(".rf")) {
+            log.error("Unable to read from a single RFile. A non-sorted WAL file was expected.
"
+                + "To read sorted WALs, please pass in a directory containing the sorted
recovery logs.");
+            continue;
+          }
+
           try (final FSDataInputStream fsinput = fs.open(path);
               DataInputStream input = DfsLogger.getDecryptingStream(fsinput, siteConfig))
{
             while (true) {
@@ -146,10 +155,12 @@ public class LogReader implements KeywordExecutable {
             continue;
           }
         } else {
-          // read the log entries sorted in a map file
-          try (RecoveryLogReader input = new RecoveryLogReader(fs, path)) {
-            while (input.hasNext()) {
-              Entry<LogFileKey,LogFileValue> entry = input.next();
+          // read the log entries in a sorted RFile. This has to be a directory that contains
the
+          // finished file.
+          try (var rli = new RecoveryLogsIterator(context, Collections.singletonList(path),
null,
+              null, false)) {
+            while (rli.hasNext()) {
+              Entry<LogFileKey,LogFileValue> entry = rli.next();
               printLogEvent(entry.getKey(), entry.getValue(), row, rowMatcher, ke, tabletIds,
                   opts.maxMutations);
             }
@@ -200,9 +211,7 @@ public class LogReader implements KeywordExecutable {
       }
 
     }
-
     System.out.println(key);
     System.out.println(LogFileValue.format(value, maxMutations));
   }
-
 }
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsIteratorTest.java
b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsIteratorTest.java
new file mode 100644
index 0000000..d6a013e
--- /dev/null
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsIteratorTest.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.tserver.log;
+
+import static org.apache.accumulo.tserver.logger.LogEvents.DEFINE_TABLET;
+import static org.apache.accumulo.tserver.logger.LogEvents.OPEN;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.crypto.CryptoServiceFactory;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.log.SortedLogState;
+import org.apache.accumulo.tserver.logger.LogFileKey;
+import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "paths not set by user input")
+public class RecoveryLogsIteratorTest {
+
+  private VolumeManager fs;
+  private File workDir;
+  static final KeyExtent extent = new KeyExtent(TableId.of("table"), null, null);
+  static ServerContext context;
+  static LogSorter logSorter;
+
+  @Rule
+  public TemporaryFolder tempFolder =
+      new TemporaryFolder(new File(System.getProperty("user.dir") + "/target"));
+
+  @Before
+  public void setUp() throws Exception {
+    context = createMock(ServerContext.class);
+    logSorter = new LogSorter(context, DefaultConfiguration.getInstance());
+
+    workDir = tempFolder.newFolder();
+    String path = workDir.getAbsolutePath();
+    assertTrue(workDir.delete());
+    fs = VolumeManagerImpl.getLocalForTesting(path);
+    expect(context.getVolumeManager()).andReturn(fs).anyTimes();
+    expect(context.getCryptoService()).andReturn(CryptoServiceFactory.newDefaultInstance())
+        .anyTimes();
+    expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes();
+    replay(context);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    fs.close();
+  }
+
+  static class KeyValue implements Comparable<KeyValue> {
+    public final LogFileKey key;
+    public final LogFileValue value;
+
+    KeyValue() {
+      key = new LogFileKey();
+      value = new LogFileValue();
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(key) + Objects.hashCode(value);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return this == obj || (obj instanceof KeyValue && 0 == compareTo((KeyValue)
obj));
+    }
+
+    @Override
+    public int compareTo(KeyValue o) {
+      return key.compareTo(o.key);
+    }
+  }
+
+  @Test
+  public void testSimpleRLI() throws IOException {
+    KeyValue keyValue = new KeyValue();
+    keyValue.key.event = DEFINE_TABLET;
+    keyValue.key.seq = 0;
+    keyValue.key.tabletId = 1;
+    keyValue.key.tablet = extent;
+
+    KeyValue[] keyValues = {keyValue};
+
+    Map<String,KeyValue[]> logs = new TreeMap<>();
+    logs.put("keyValues", keyValues);
+
+    ArrayList<Path> dirs = new ArrayList<>();
+
+    createRecoveryDir(logs, dirs, true);
+
+    try (RecoveryLogsIterator rli = new RecoveryLogsIterator(context, dirs, null, null, false))
{
+      while (rli.hasNext()) {
+        Entry<LogFileKey,LogFileValue> entry = rli.next();
+        assertEquals("TabletId does not match", 1, entry.getKey().tabletId);
+        assertEquals("Event does not match", DEFINE_TABLET, entry.getKey().event);
+      }
+    }
+  }
+
+  @Test
+  public void testFinishMarker() throws IOException {
+    KeyValue keyValue = new KeyValue();
+    keyValue.key.event = DEFINE_TABLET;
+    keyValue.key.seq = 0;
+    keyValue.key.tabletId = 1;
+    keyValue.key.tablet = extent;
+
+    KeyValue[] keyValues = {keyValue};
+
+    Map<String,KeyValue[]> logs = new TreeMap<>();
+    logs.put("keyValues", keyValues);
+
+    ArrayList<Path> dirs = new ArrayList<>();
+
+    createRecoveryDir(logs, dirs, false);
+
+    assertThrows("Finish marker should not be found", IOException.class,
+        () -> new RecoveryLogsIterator(context, dirs, null, null, false));
+  }
+
+  @Test
+  public void testSingleFile() throws IOException {
+    String destPath = workDir + "/test.rf";
+    fs.create(new Path(destPath));
+
+    assertThrows("Finish marker should not be found for a single file.", IOException.class,
+        () -> new RecoveryLogsIterator(context, Collections.singletonList(new Path(destPath)),
null,
+            null, false));
+  }
+
+  @Test
+  public void testCheckFirstKeyFailed() throws IOException {
+    KeyValue keyValue = new KeyValue();
+    keyValue.key.event = DEFINE_TABLET;
+    keyValue.key.seq = 0;
+    keyValue.key.tabletId = 1;
+    keyValue.key.tablet = extent;
+
+    KeyValue[] keyValues = {keyValue};
+
+    Map<String,KeyValue[]> logs = new TreeMap<>();
+    logs.put("keyValues", keyValues);
+
+    ArrayList<Path> dirs = new ArrayList<>();
+
+    createRecoveryDir(logs, dirs, true);
+
+    assertThrows("First log entry is not OPEN so exception should be thrown.",
+        IllegalStateException.class,
+        () -> new RecoveryLogsIterator(context, dirs, null, null, true));
+  }
+
+  @Test
+  public void testCheckFirstKeyPass() throws IOException {
+    KeyValue keyValue1 = new KeyValue();
+    keyValue1.key.event = OPEN;
+    keyValue1.key.seq = 0;
+    keyValue1.key.tabletId = -1;
+    keyValue1.key.tserverSession = "1";
+
+    KeyValue keyValue2 = new KeyValue();
+    keyValue2.key.event = DEFINE_TABLET;
+    keyValue2.key.seq = 0;
+    keyValue2.key.tabletId = 1;
+    keyValue2.key.tablet = extent;
+
+    KeyValue[] keyValues = {keyValue1, keyValue2};
+
+    Map<String,KeyValue[]> logs = new TreeMap<>();
+    logs.put("keyValues", keyValues);
+
+    ArrayList<Path> dirs = new ArrayList<>();
+
+    createRecoveryDir(logs, dirs, true);
+
+    try (RecoveryLogsIterator rli = new RecoveryLogsIterator(context, dirs, null, null, true))
{
+      while (rli.hasNext()) {
+        Entry<LogFileKey,LogFileValue> entry = rli.next();
+        assertNotNull(entry.getKey());
+      }
+    }
+  }
+
+  private void createRecoveryDir(Map<String,KeyValue[]> logs, ArrayList<Path>
dirs,
+      boolean FinishMarker) throws IOException {
+
+    for (Entry<String,KeyValue[]> entry : logs.entrySet()) {
+      String destPath = workDir + "/dir";
+      FileSystem ns = fs.getFileSystemByPath(new Path(destPath));
+
+      // convert test object to Pairs for LogSorter.
+      List<Pair<LogFileKey,LogFileValue>> buffer = new ArrayList<>();
+      for (KeyValue pair : entry.getValue()) {
+        buffer.add(new Pair<>(pair.key, pair.value));
+      }
+      logSorter.writeBuffer(destPath, buffer, 0);
+
+      if (FinishMarker)
+        ns.create(SortedLogState.getFinishedMarkerPath(destPath));
+
+      dirs.add(new Path(destPath));
+    }
+  }
+}
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsReaderTest.java
b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsReaderTest.java
deleted file mode 100644
index 5a3a0af..0000000
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsReaderTest.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.tserver.log;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.AbstractMap;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map.Entry;
-
-import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.fs.VolumeManagerImpl;
-import org.apache.accumulo.server.log.SortedLogState;
-import org.apache.accumulo.tserver.log.RecoveryLogReader.SortCheckIterator;
-import org.apache.accumulo.tserver.logger.LogEvents;
-import org.apache.accumulo.tserver.logger.LogFileKey;
-import org.apache.accumulo.tserver.logger.LogFileValue;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.MapFile.Writer;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-
-@SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "paths not set by user input")
-public class RecoveryLogsReaderTest {
-
-  private VolumeManager fs;
-  private File workDir;
-
-  @Rule
-  public TemporaryFolder tempFolder =
-      new TemporaryFolder(new File(System.getProperty("user.dir") + "/target"));
-
-  @Before
-  public void setUp() throws Exception {
-    workDir = tempFolder.newFolder();
-    String path = workDir.getAbsolutePath();
-    assertTrue(workDir.delete());
-    fs = VolumeManagerImpl.getLocalForTesting(path);
-    Path root = new Path("file://" + path);
-    fs.mkdirs(root);
-    fs.create(new Path(root, "finished")).close();
-    FileSystem ns = fs.getFileSystemByPath(root);
-
-    Writer oddWriter = new Writer(ns.getConf(), ns.makeQualified(new Path(root, "odd")),
-        Writer.keyClass(IntWritable.class), Writer.valueClass(BytesWritable.class));
-    BytesWritable value = new BytesWritable("someValue".getBytes());
-    for (int i = 1; i < 1000; i += 2) {
-      oddWriter.append(new IntWritable(i), value);
-    }
-    oddWriter.close();
-
-    Writer evenWriter = new Writer(ns.getConf(), ns.makeQualified(new Path(root, "even")),
-        Writer.keyClass(IntWritable.class), Writer.valueClass(BytesWritable.class));
-    for (int i = 0; i < 1000; i += 2) {
-      if (i == 10)
-        continue;
-      evenWriter.append(new IntWritable(i), value);
-    }
-    evenWriter.close();
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    fs.close();
-  }
-
-  private void scan(RecoveryLogReader reader, int start) throws IOException {
-    IntWritable key = new IntWritable();
-    BytesWritable value = new BytesWritable();
-
-    for (int i = start + 1; i < 1000; i++) {
-      if (i == 10)
-        continue;
-      assertTrue(reader.next(key, value));
-      assertEquals(i, key.get());
-    }
-  }
-
-  private void scanOdd(RecoveryLogReader reader, int start) throws IOException {
-    IntWritable key = new IntWritable();
-    BytesWritable value = new BytesWritable();
-
-    for (int i = start + 2; i < 1000; i += 2) {
-      assertTrue(reader.next(key, value));
-      assertEquals(i, key.get());
-    }
-  }
-
-  @Test
-  public void testMultiReader() throws IOException {
-    Path manyMaps = new Path("file://" + workDir.getAbsolutePath());
-    RecoveryLogReader reader = new RecoveryLogReader(fs, manyMaps);
-    IntWritable key = new IntWritable();
-    BytesWritable value = new BytesWritable();
-
-    for (int i = 0; i < 1000; i++) {
-      if (i == 10)
-        continue;
-      assertTrue(reader.next(key, value));
-      assertEquals(i, key.get());
-    }
-    assertEquals(value.compareTo(new BytesWritable("someValue".getBytes())), 0);
-    assertFalse(reader.next(key, value));
-
-    key.set(500);
-    assertTrue(reader.seek(key));
-    scan(reader, 500);
-    key.set(10);
-    assertFalse(reader.seek(key));
-    scan(reader, 10);
-    key.set(1000);
-    assertFalse(reader.seek(key));
-    assertFalse(reader.next(key, value));
-    key.set(-1);
-    assertFalse(reader.seek(key));
-    key.set(0);
-    assertTrue(reader.next(key, value));
-    assertEquals(0, key.get());
-    reader.close();
-
-    fs.deleteRecursively(new Path(manyMaps, "even"));
-    reader = new RecoveryLogReader(fs, manyMaps);
-    key.set(501);
-    assertTrue(reader.seek(key));
-    scanOdd(reader, 501);
-    key.set(1000);
-    assertFalse(reader.seek(key));
-    assertFalse(reader.next(key, value));
-    key.set(-1);
-    assertFalse(reader.seek(key));
-    key.set(1);
-    assertTrue(reader.next(key, value));
-    assertEquals(1, key.get());
-    reader.close();
-
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testSortCheck() {
-
-    List<Entry<LogFileKey,LogFileValue>> unsorted = new ArrayList<>();
-
-    LogFileKey k1 = new LogFileKey();
-    k1.event = LogEvents.MANY_MUTATIONS;
-    k1.tabletId = 2;
-    k1.seq = 55;
-
-    LogFileKey k2 = new LogFileKey();
-    k2.event = LogEvents.MANY_MUTATIONS;
-    k2.tabletId = 9;
-    k2.seq = 9;
-
-    unsorted.add(new AbstractMap.SimpleEntry<>(k2, (LogFileValue) null));
-    unsorted.add(new AbstractMap.SimpleEntry<>(k1, (LogFileValue) null));
-
-    SortCheckIterator iter = new SortCheckIterator(unsorted.iterator());
-
-    while (iter.hasNext()) {
-      iter.next();
-    }
-  }
-
-  /**
-   * Test a failed marker doesn't cause issues. See Github issue
-   * https://github.com/apache/accumulo/issues/961
-   */
-  @Test
-  public void testFailed() throws Exception {
-    Path manyMaps = new Path("file://" + workDir.getAbsolutePath());
-    fs.create(new Path(manyMaps, SortedLogState.FAILED.getMarker())).close();
-
-    RecoveryLogReader reader = new RecoveryLogReader(fs, manyMaps);
-    IntWritable key = new IntWritable();
-    BytesWritable value = new BytesWritable();
-
-    for (int i = 0; i < 1000; i++) {
-      if (i == 10)
-        continue;
-      assertTrue(reader.next(key, value));
-      assertEquals(i, key.get());
-    }
-    reader.close();
-
-    assertTrue(fs.delete(new Path(manyMaps, SortedLogState.FAILED.getMarker())));
-  }
-
-}

Mime
View raw message