accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From adamjsh...@apache.org
Subject [accumulo] branch 1.7 updated: [ACCUMULO-4787] Close input stream in AccumuloReplicaSystem (#369)
Date Wed, 14 Feb 2018 20:38:42 GMT
This is an automated email from the ASF dual-hosted git repository.

adamjshook pushed a commit to branch 1.7
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/1.7 by this push:
     new 229eb7b  [ACCUMULO-4787] Close input stream in AccumuloReplicaSystem (#369)
229eb7b is described below

commit 229eb7be7f040ecdf1f0238201533529da59bca7
Author: Adam J. Shook <adamjshook@gmail.com>
AuthorDate: Wed Feb 14 14:15:05 2018 -0500

    [ACCUMULO-4787] Close input stream in AccumuloReplicaSystem (#369)
---
 .../org/apache/accumulo/tserver/log/DfsLogger.java |   5 +-
 .../org/apache/accumulo/tserver/log/LogSorter.java |  68 ++---
 .../apache/accumulo/tserver/logger/LogReader.java  |  43 +--
 .../tserver/replication/AccumuloReplicaSystem.java | 297 ++++++++++-----------
 .../accumulo/tserver/log/LocalWALRecoveryTest.java |   2 +-
 5 files changed, 208 insertions(+), 207 deletions(-)

diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index 8fd2b7a..7b8221f 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -298,8 +298,7 @@ public class DfsLogger {
     metaReference = meta;
   }
 
-  public static DFSLoggerInputStreams readHeaderAndReturnStream(VolumeManager fs, Path path,
AccumuloConfiguration conf) throws IOException {
-    FSDataInputStream input = fs.open(path);
+  public static DFSLoggerInputStreams readHeaderAndReturnStream(FSDataInputStream input,
AccumuloConfiguration conf) throws IOException {
     DataInputStream decryptingInput = null;
 
     byte[] magic = DfsLogger.LOG_FILE_HEADER_V3.getBytes(UTF_8);
@@ -375,7 +374,7 @@ public class DfsLogger {
 
       }
     } catch (EOFException e) {
-      log.warn("Got EOFException trying to read WAL header information, assuming the rest
of the file (" + path + ") has no data.");
+      log.warn("Got EOFException trying to read WAL header information, assuming the rest
of the file has no data.");
       // A TabletServer might have died before the (complete) header was written
       throw new LogHeaderIncompleteException(e);
     }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
index 11097ce..ba5e488 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
@@ -113,44 +113,46 @@ public class LogSorter {
         // the following call does not throw an exception if the file/dir does not exist
         fs.deleteRecursively(new Path(destPath));
 
-        DFSLoggerInputStreams inputStreams;
-        try {
-          inputStreams = DfsLogger.readHeaderAndReturnStream(fs, srcPath, conf);
-        } catch (LogHeaderIncompleteException e) {
-          log.warn("Could not read header from write-ahead log " + srcPath + ". Not sorting.");
-          // Creating a 'finished' marker will cause recovery to proceed normally and the
-          // empty file will be correctly ignored downstream.
-          fs.mkdirs(new Path(destPath));
-          writeBuffer(destPath, Collections.<Pair<LogFileKey,LogFileValue>> emptyList(),
part++);
-          fs.create(SortedLogState.getFinishedMarkerPath(destPath)).close();
-          return;
-        }
+        try (final FSDataInputStream fsinput = fs.open(srcPath)) {
+          DFSLoggerInputStreams inputStreams;
+          try {
+            inputStreams = DfsLogger.readHeaderAndReturnStream(fsinput, conf);
+          } catch (LogHeaderIncompleteException e) {
+            log.warn("Could not read header from write-ahead log " + srcPath + ". Not sorting.");
+            // Creating a 'finished' marker will cause recovery to proceed normally and the
+            // empty file will be correctly ignored downstream.
+            fs.mkdirs(new Path(destPath));
+            writeBuffer(destPath, Collections.<Pair<LogFileKey,LogFileValue>>
emptyList(), part++);
+            fs.create(SortedLogState.getFinishedMarkerPath(destPath)).close();
+            return;
+          }
 
-        this.input = inputStreams.getOriginalInput();
-        this.decryptingInput = inputStreams.getDecryptingInputStream();
+          this.input = inputStreams.getOriginalInput();
+          this.decryptingInput = inputStreams.getDecryptingInputStream();
 
-        final long bufferSize = conf.getMemoryInBytes(Property.TSERV_SORT_BUFFER_SIZE);
-        Thread.currentThread().setName("Sorting " + name + " for recovery");
-        while (true) {
-          final ArrayList<Pair<LogFileKey,LogFileValue>> buffer = new ArrayList<>();
-          try {
-            long start = input.getPos();
-            while (input.getPos() - start < bufferSize) {
-              LogFileKey key = new LogFileKey();
-              LogFileValue value = new LogFileValue();
-              key.readFields(decryptingInput);
-              value.readFields(decryptingInput);
-              buffer.add(new Pair<>(key, value));
+          final long bufferSize = conf.getMemoryInBytes(Property.TSERV_SORT_BUFFER_SIZE);
+          Thread.currentThread().setName("Sorting " + name + " for recovery");
+          while (true) {
+            final ArrayList<Pair<LogFileKey,LogFileValue>> buffer = new ArrayList<>();
+            try {
+              long start = input.getPos();
+              while (input.getPos() - start < bufferSize) {
+                LogFileKey key = new LogFileKey();
+                LogFileValue value = new LogFileValue();
+                key.readFields(decryptingInput);
+                value.readFields(decryptingInput);
+                buffer.add(new Pair<>(key, value));
+              }
+              writeBuffer(destPath, buffer, part++);
+              buffer.clear();
+            } catch (EOFException ex) {
+              writeBuffer(destPath, buffer, part++);
+              break;
             }
-            writeBuffer(destPath, buffer, part++);
-            buffer.clear();
-          } catch (EOFException ex) {
-            writeBuffer(destPath, buffer, part++);
-            break;
           }
+          fs.create(new Path(destPath, "finished")).close();
+          log.info("Finished log sort " + name + " " + getBytesCopied() + " bytes " + part
+ " parts in " + getSortTime() + "ms");
         }
-        fs.create(new Path(destPath, "finished")).close();
-        log.info("Finished log sort " + name + " " + getBytesCopied() + " bytes " + part
+ " parts in " + getSortTime() + "ms");
       } catch (Throwable t) {
         try {
           // parent dir may not exist
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
index 23a9fab..928861e 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
@@ -38,6 +38,7 @@ import org.apache.accumulo.tserver.log.DfsLogger;
 import org.apache.accumulo.tserver.log.DfsLogger.DFSLoggerInputStreams;
 import org.apache.accumulo.tserver.log.DfsLogger.LogHeaderIncompleteException;
 import org.apache.accumulo.tserver.log.MultiReader;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
@@ -100,28 +101,30 @@ public class LogReader {
       LogFileValue value = new LogFileValue();
 
       if (fs.isFile(path)) {
-        // read log entries from a simple hdfs file
-        DFSLoggerInputStreams streams;
-        try {
-          streams = DfsLogger.readHeaderAndReturnStream(fs, path, SiteConfiguration.getInstance());
-        } catch (LogHeaderIncompleteException e) {
-          log.warn("Could not read header for " + path + ". Ignoring...");
-          continue;
-        }
-        DataInputStream input = streams.getDecryptingInputStream();
-
-        try {
-          while (true) {
-            try {
-              key.readFields(input);
-              value.readFields(input);
-            } catch (EOFException ex) {
-              break;
+        try (final FSDataInputStream fsinput = fs.open(path)) {
+          // read log entries from a simple hdfs file
+          DFSLoggerInputStreams streams;
+          try {
+            streams = DfsLogger.readHeaderAndReturnStream(fsinput, SiteConfiguration.getInstance());
+          } catch (LogHeaderIncompleteException e) {
+            log.warn("Could not read header for " + path + ". Ignoring...");
+            continue;
+          }
+          DataInputStream input = streams.getDecryptingInputStream();
+
+          try {
+            while (true) {
+              try {
+                key.readFields(input);
+                value.readFields(input);
+              } catch (EOFException ex) {
+                break;
+              }
+              printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
             }
-            printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
+          } finally {
+            input.close();
           }
-        } finally {
-          input.close();
         }
       } else {
         // read the log entries sorted in a map file
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
index 28eca21..b910707 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
@@ -76,6 +76,7 @@ import org.apache.accumulo.tserver.log.DfsLogger.LogHeaderIncompleteException;
 import org.apache.accumulo.tserver.logger.LogFileKey;
 import org.apache.accumulo.tserver.logger.LogFileValue;
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.thrift.transport.TTransportException;
@@ -320,46 +321,43 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
   protected Status replicateRFiles(ClientContext peerContext, final HostAndPort peerTserver,
final ReplicationTarget target, final Path p, final Status status,
       final long sizeLimit, final String remoteTableId, final TCredentials tcreds, final
ReplicaSystemHelper helper, long timeout) throws TTransportException,
       AccumuloException, AccumuloSecurityException {
-    DataInputStream input;
-    try {
-      input = getRFileInputStream(p);
-    } catch (IOException e) {
-      log.error("Could not create input stream from RFile, will retry", e);
-      return status;
-    }
-
-    Status lastStatus = status, currentStatus = status;
-    while (true) {
-      // Read and send a batch of mutations
-      ReplicationStats replResult = ReplicationClient.executeServicerWithReturn(peerContext,
peerTserver, new RFileClientExecReturn(target, input, p,
-          currentStatus, sizeLimit, remoteTableId, tcreds), timeout);
+    try (final DataInputStream input = getRFileInputStream(p)) {
+      Status lastStatus = status, currentStatus = status;
+      while (true) {
+        // Read and send a batch of mutations
+        ReplicationStats replResult = ReplicationClient.executeServicerWithReturn(peerContext,
peerTserver, new RFileClientExecReturn(target, input, p,
+            currentStatus, sizeLimit, remoteTableId, tcreds), timeout);
 
-      // Catch the overflow
-      long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
-      if (newBegin < 0) {
-        newBegin = Long.MAX_VALUE;
-      }
+        // Catch the overflow
+        long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
+        if (newBegin < 0) {
+          newBegin = Long.MAX_VALUE;
+        }
 
-      currentStatus = Status.newBuilder(currentStatus).setBegin(newBegin).build();
+        currentStatus = Status.newBuilder(currentStatus).setBegin(newBegin).build();
 
-      log.debug("Sent batch for replication of {} to {}, with new Status {}", p, target,
ProtobufUtil.toString(currentStatus));
+        log.debug("Sent batch for replication of {} to {}, with new Status {}", p, target,
ProtobufUtil.toString(currentStatus));
 
-      // If we got a different status
-      if (!currentStatus.equals(lastStatus)) {
-        // If we don't have any more work, just quit
-        if (!StatusUtil.isWorkRequired(currentStatus)) {
-          return currentStatus;
+        // If we got a different status
+        if (!currentStatus.equals(lastStatus)) {
+          // If we don't have any more work, just quit
+          if (!StatusUtil.isWorkRequired(currentStatus)) {
+            return currentStatus;
+          } else {
+            // Otherwise, let it loop and replicate some more data
+            lastStatus = currentStatus;
+          }
         } else {
-          // Otherwise, let it loop and replicate some more data
-          lastStatus = currentStatus;
-        }
-      } else {
-        log.debug("Did not replicate any new data for {} to {}, (state was {})", p, target,
ProtobufUtil.toString(lastStatus));
+          log.debug("Did not replicate any new data for {} to {}, (state was {})", p, target,
ProtobufUtil.toString(lastStatus));
 
-        // otherwise, we didn't actually replicate (likely because there was error sending
the data)
-        // we can just not record any updates, and it will be picked up again by the work
assigner
-        return status;
+          // otherwise, we didn't actually replicate (likely because there was error sending
the data)
+          // we can just not record any updates, and it will be picked up again by the work
assigner
+          return status;
+        }
       }
+    } catch (IOException e) {
+      log.error("Could not create input stream from RFile, will retry", e);
+      return status;
     }
   }
 
@@ -369,11 +367,112 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
 
     log.debug("Replication WAL to peer tserver");
     final Set<Integer> tids;
-    final DataInputStream input;
-    Span span = Trace.start("Read WAL header");
-    span.data("file", p.toString());
-    try {
-      input = getWalStream(p);
+    try (final FSDataInputStream fsinput = fs.open(p); final DataInputStream input = getWalStream(p,
fsinput)) {
+      log.debug("Skipping unwanted data in WAL");
+      Span span = Trace.start("Consume WAL prefix");
+      span.data("file", p.toString());
+      try {
+        // We want to read all records in the WAL up to the "begin" offset contained in the
Status message,
+        // building a Set of tids from DEFINE_TABLET events which correspond to table ids
for future mutations
+        tids = consumeWalPrefix(target, input, p, status, sizeLimit);
+      } catch (IOException e) {
+        log.warn("Unexpected error consuming file.");
+        return status;
+      } finally {
+        span.stop();
+      }
+
+      log.debug("Sending batches of data to peer tserver");
+
+      Status lastStatus = status, currentStatus = status;
+      final AtomicReference<Exception> exceptionRef = new AtomicReference<>();
+      while (true) {
+        // Set some trace info
+        span = Trace.start("Replicate WAL batch");
+        span.data("Batch size (bytes)", Long.toString(sizeLimit));
+        span.data("File", p.toString());
+        span.data("Peer instance name", peerContext.getInstance().getInstanceName());
+        span.data("Peer tserver", peerTserver.toString());
+        span.data("Remote table ID", remoteTableId);
+
+        ReplicationStats replResult;
+        try {
+          // Read and send a batch of mutations
+          replResult = ReplicationClient.executeServicerWithReturn(peerContext, peerTserver,
new WalClientExecReturn(target, input, p, currentStatus,
+              sizeLimit, remoteTableId, tcreds, tids), timeout);
+        } catch (Exception e) {
+          log.error("Caught exception replicating data to {} at {}", peerContext.getInstance().getInstanceName(),
peerTserver, e);
+          throw e;
+        } finally {
+          span.stop();
+        }
+
+        // Catch the overflow
+        long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
+        if (newBegin < 0) {
+          newBegin = Long.MAX_VALUE;
+        }
+
+        currentStatus = Status.newBuilder(currentStatus).setBegin(newBegin).build();
+
+        log.debug("Sent batch for replication of {} to {}, with new Status {}", p, target,
ProtobufUtil.toString(currentStatus));
+
+        // If we got a different status
+        if (!currentStatus.equals(lastStatus)) {
+          span = Trace.start("Update replication table");
+          try {
+            if (null != accumuloUgi) {
+              final Status copy = currentStatus;
+              accumuloUgi.doAs(new PrivilegedAction<Void>() {
+                @Override
+                public Void run() {
+                  try {
+                    helper.recordNewStatus(p, copy, target);
+                  } catch (Exception e) {
+                    exceptionRef.set(e);
+                  }
+                  return null;
+                }
+              });
+              Exception e = exceptionRef.get();
+              if (null != e) {
+                if (e instanceof TableNotFoundException) {
+                  throw (TableNotFoundException) e;
+                } else if (e instanceof AccumuloSecurityException) {
+                  throw (AccumuloSecurityException) e;
+                } else if (e instanceof AccumuloException) {
+                  throw (AccumuloException) e;
+                } else {
+                  throw new RuntimeException("Received unexpected exception", e);
+                }
+              }
+            } else {
+              helper.recordNewStatus(p, currentStatus, target);
+            }
+          } catch (TableNotFoundException e) {
+            log.error("Tried to update status in replication table for {} as {}, but the
table did not exist", p, ProtobufUtil.toString(currentStatus), e);
+            throw new RuntimeException("Replication table did not exist, will retry", e);
+          } finally {
+            span.stop();
+          }
+
+          log.debug("Recorded updated status for {}: {}", p, ProtobufUtil.toString(currentStatus));
+
+          // If we don't have any more work, just quit
+          if (!StatusUtil.isWorkRequired(currentStatus)) {
+            return currentStatus;
+          } else {
+            // Otherwise, let it loop and replicate some more data
+            lastStatus = currentStatus;
+          }
+        } else {
+          log.debug("Did not replicate any new data for {} to {}, (state was {})", p, target,
ProtobufUtil.toString(lastStatus));
+
+          // otherwise, we didn't actually replicate (likely because there was error sending
the data)
+          // we can just not record any updates, and it will be picked up again by the work
assigner
+          return status;
+        }
+      }
     } catch (LogHeaderIncompleteException e) {
       log.warn("Could not read header from {}, assuming that there is no data present in
the WAL, therefore replication is complete", p);
       Status newStatus;
@@ -383,7 +482,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
       } else {
         newStatus = Status.newBuilder(status).setBegin(status.getEnd()).build();
       }
-      span = Trace.start("Update replication table");
+      Span span = Trace.start("Update replication table");
       try {
         helper.recordNewStatus(p, newStatus, target);
       } catch (TableNotFoundException tnfe) {
@@ -397,114 +496,6 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
       log.error("Could not create stream for WAL", e);
       // No data sent (bytes nor records) and no progress made
       return status;
-    } finally {
-      span.stop();
-    }
-
-    log.debug("Skipping unwanted data in WAL");
-    span = Trace.start("Consume WAL prefix");
-    span.data("file", p.toString());
-    try {
-      // We want to read all records in the WAL up to the "begin" offset contained in the
Status message,
-      // building a Set of tids from DEFINE_TABLET events which correspond to table ids for
future mutations
-      tids = consumeWalPrefix(target, input, p, status, sizeLimit);
-    } catch (IOException e) {
-      log.warn("Unexpected error consuming file.");
-      return status;
-    } finally {
-      span.stop();
-    }
-
-    log.debug("Sending batches of data to peer tserver");
-
-    Status lastStatus = status, currentStatus = status;
-    final AtomicReference<Exception> exceptionRef = new AtomicReference<>();
-    while (true) {
-      // Set some trace info
-      span = Trace.start("Replicate WAL batch");
-      span.data("Batch size (bytes)", Long.toString(sizeLimit));
-      span.data("File", p.toString());
-      span.data("Peer instance name", peerContext.getInstance().getInstanceName());
-      span.data("Peer tserver", peerTserver.toString());
-      span.data("Remote table ID", remoteTableId);
-
-      ReplicationStats replResult;
-      try {
-        // Read and send a batch of mutations
-        replResult = ReplicationClient.executeServicerWithReturn(peerContext, peerTserver,
new WalClientExecReturn(target, input, p, currentStatus, sizeLimit,
-            remoteTableId, tcreds, tids), timeout);
-      } catch (Exception e) {
-        log.error("Caught exception replicating data to {} at {}", peerContext.getInstance().getInstanceName(),
peerTserver, e);
-        throw e;
-      } finally {
-        span.stop();
-      }
-
-      // Catch the overflow
-      long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
-      if (newBegin < 0) {
-        newBegin = Long.MAX_VALUE;
-      }
-
-      currentStatus = Status.newBuilder(currentStatus).setBegin(newBegin).build();
-
-      log.debug("Sent batch for replication of {} to {}, with new Status {}", p, target,
ProtobufUtil.toString(currentStatus));
-
-      // If we got a different status
-      if (!currentStatus.equals(lastStatus)) {
-        span = Trace.start("Update replication table");
-        try {
-          if (null != accumuloUgi) {
-            final Status copy = currentStatus;
-            accumuloUgi.doAs(new PrivilegedAction<Void>() {
-              @Override
-              public Void run() {
-                try {
-                  helper.recordNewStatus(p, copy, target);
-                } catch (Exception e) {
-                  exceptionRef.set(e);
-                }
-                return null;
-              }
-            });
-            Exception e = exceptionRef.get();
-            if (null != e) {
-              if (e instanceof TableNotFoundException) {
-                throw (TableNotFoundException) e;
-              } else if (e instanceof AccumuloSecurityException) {
-                throw (AccumuloSecurityException) e;
-              } else if (e instanceof AccumuloException) {
-                throw (AccumuloException) e;
-              } else {
-                throw new RuntimeException("Received unexpected exception", e);
-              }
-            }
-          } else {
-            helper.recordNewStatus(p, currentStatus, target);
-          }
-        } catch (TableNotFoundException e) {
-          log.error("Tried to update status in replication table for {} as {}, but the table
did not exist", p, ProtobufUtil.toString(currentStatus), e);
-          throw new RuntimeException("Replication table did not exist, will retry", e);
-        } finally {
-          span.stop();
-        }
-
-        log.debug("Recorded updated status for {}: {}", p, ProtobufUtil.toString(currentStatus));
-
-        // If we don't have any more work, just quit
-        if (!StatusUtil.isWorkRequired(currentStatus)) {
-          return currentStatus;
-        } else {
-          // Otherwise, let it loop and replicate some more data
-          lastStatus = currentStatus;
-        }
-      } else {
-        log.debug("Did not replicate any new data for {} to {}, (state was {})", p, target,
ProtobufUtil.toString(lastStatus));
-
-        // otherwise, we didn't actually replicate (likely because there was error sending
the data)
-        // we can just not record any updates, and it will be picked up again by the work
assigner
-        return status;
-      }
     }
   }
 
@@ -686,9 +677,15 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
     return tids;
   }
 
-  public DataInputStream getWalStream(Path p) throws IOException {
-    DFSLoggerInputStreams streams = DfsLogger.readHeaderAndReturnStream(fs, p, conf);
-    return streams.getDecryptingInputStream();
+  public DataInputStream getWalStream(Path p, FSDataInputStream input) throws IOException
{
+    Span span = Trace.start("Read WAL header");
+    span.data("file", p.toString());
+    try {
+      DFSLoggerInputStreams streams = DfsLogger.readHeaderAndReturnStream(input, conf);
+      return streams.getDecryptingInputStream();
+    } finally {
+      span.stop();
+    }
   }
 
   protected WalReplication getWalEdits(ReplicationTarget target, DataInputStream wal, Path
p, Status status, long sizeLimit, Set<Integer> desiredTids)
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LocalWALRecoveryTest.java
b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LocalWALRecoveryTest.java
index 8261a17..f1f6a3a 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LocalWALRecoveryTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LocalWALRecoveryTest.java
@@ -78,7 +78,7 @@ public class LocalWALRecoveryTest {
     final Path path = recovered[0].getPath();
     final VolumeManager volumeManager = VolumeManagerImpl.getLocal(folder.getRoot().getAbsolutePath());
 
-    final DFSLoggerInputStreams streams = DfsLogger.readHeaderAndReturnStream(volumeManager,
path, configuration);
+    final DFSLoggerInputStreams streams = DfsLogger.readHeaderAndReturnStream(volumeManager.open(path),
configuration);
     final DataInputStream input = streams.getDecryptingInputStream();
 
     final LogFileKey key = new LogFileKey();

-- 
To stop receiving notification emails like this one, please contact
adamjshook@apache.org.

Mime
View raw message