accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject git commit: ACCUMULO-1814 Fixes WALog reading issue from 1.5
Date Mon, 28 Oct 2013 14:12:10 GMT
Updated Branches:
  refs/heads/master 2505f7a8c -> 8730d5cef


ACCUMULO-1814 Fixes WALog reading issue from 1.5

WALogs from 1.5.0 use a slightly different format than WALogs
from 1.6.0.  Consequently, the code to read WALogs from 1.6.0
doesn't work on files from 1.5.0.  I made changes to
DFSLogger to correctly interpret 1.5.0 based log files,
and increments the log version number to 3, so that
the WALog reading code can correctly identify what type of
file we're dealing with.

This may still require more testing especially with encryption
turned on but for now this will do.

Signed-off-by: Eric Newton <eric.newton@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/8730d5ce
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/8730d5ce
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/8730d5ce

Branch: refs/heads/master
Commit: 8730d5cef37f301fc3f204a48e6af63c32a4f207
Parents: 2505f7a
Author: Michael Allen <michael@sqrrl.com>
Authored: Mon Oct 28 00:12:11 2013 -0400
Committer: Eric Newton <eric.newton@gmail.com>
Committed: Mon Oct 28 09:44:01 2013 -0400

----------------------------------------------------------------------
 server/pom.xml                                  |  10 ++
 .../accumulo/server/logger/LogReader.java       |  18 ++-
 .../server/tabletserver/log/DfsLogger.java      | 123 ++++++++++++---
 .../server/tabletserver/log/LogSorter.java      |  47 +-----
 .../log/TestUpgradePathForWALogs.java           | 154 +++++++++++++++++++
 server/src/test/resources/walog-from-15.walog   | Bin 0 -> 25608 bytes
 server/src/test/resources/walog-from-16.walog   | Bin 0 -> 8170 bytes
 7 files changed, 282 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/8730d5ce/server/pom.xml
----------------------------------------------------------------------
diff --git a/server/pom.xml b/server/pom.xml
index ff846b4..f7da3bb 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -128,6 +128,16 @@
       <testResource>
         <filtering>true</filtering>
         <directory>src/test/resources</directory>
+        <excludes>
+          <exclude>**/*.walog</exclude>
+        </excludes>
+      </testResource>
+      <testResource>
+        <filtering>false</filtering>
+        <directory>src/test/resources</directory>
+        <includes>
+          <include>**/*.walog</include>
+        </includes>
       </testResource>
     </testResources>
     <pluginManagement>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8730d5ce/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java b/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java
index 719b85c..9368819 100644
--- a/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java
+++ b/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java
@@ -16,25 +16,25 @@
  */
 package org.apache.accumulo.server.logger;
 
+import java.io.DataInputStream;
 import java.io.EOFException;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.accumulo.core.cli.Help;
+import org.apache.accumulo.core.conf.SiteConfiguration;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.server.tabletserver.log.DfsLogger;
+import org.apache.accumulo.server.tabletserver.log.DfsLogger.DFSLoggerInputStreams;
 import org.apache.accumulo.server.tabletserver.log.MultiReader;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 
@@ -90,26 +90,28 @@ public class LogReader {
     
     for (String file : opts.files) {
       
-      Map<String, String> meta = new HashMap<String, String>();
       Path path = new Path(file);
       LogFileKey key = new LogFileKey();
       LogFileValue value = new LogFileValue();
       
       if (fs.isFile(path)) {
         // read log entries from a simple hdfs file
-        FSDataInputStream f = DfsLogger.readHeader(fs, path, meta);
+        @SuppressWarnings("deprecation")
+        DFSLoggerInputStreams streams = DfsLogger.readHeaderAndReturnStream(fs, path, SiteConfiguration.getSiteConfiguration());
       
+        DataInputStream input =  streams.getDecryptingInputStream();
+
         try {
           while (true) {
             try {
-              key.readFields(f);
-              value.readFields(f);
+              key.readFields(input);
+              value.readFields(input);
             } catch (EOFException ex) {
               break;
             }
             printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
           }
         } finally {
-          f.close();
+          input.close();
         }
       } else {
         // read the log entries sorted in a map file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8730d5ce/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
b/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
index 0b64d42..de3d012 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
@@ -22,6 +22,7 @@ import static org.apache.accumulo.server.logger.LogEvents.DEFINE_TABLET;
 import static org.apache.accumulo.server.logger.LogEvents.MANY_MUTATIONS;
 import static org.apache.accumulo.server.logger.LogEvents.OPEN;
 
+import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
@@ -30,6 +31,7 @@ import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -43,6 +45,7 @@ import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.security.crypto.CryptoModuleFactory;
 import org.apache.accumulo.core.security.crypto.CryptoModuleParameters;
+import org.apache.accumulo.core.security.crypto.DefaultCryptoModule;
 import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.util.StringUtil;
 import org.apache.accumulo.server.ServerConstants;
@@ -63,6 +66,7 @@ import org.apache.log4j.Logger;
 public class DfsLogger {
   // Package private so that LogSorter can find this
   static final String LOG_FILE_HEADER_V2 = "--- Log File Header (v2) ---";
+  static final String LOG_FILE_HEADER_V3 = "--- Log File Header (v3) ---";
   
   private static Logger log = Logger.getLogger(DfsLogger.class);
   
@@ -74,6 +78,34 @@ public class DfsLogger {
     }
   }
   
+  public static class DFSLoggerInputStreams {
+    
+    private FSDataInputStream originalInput;
+    private DataInputStream decryptingInputStream;
+
+    public DFSLoggerInputStreams(FSDataInputStream originalInput, DataInputStream decryptingInputStream)
{
+      this.originalInput = originalInput;
+      this.decryptingInputStream = decryptingInputStream;
+    }
+
+    public FSDataInputStream getOriginalInput() {
+      return originalInput;
+    }
+
+    public void setOriginalInput(FSDataInputStream originalInput) {
+      this.originalInput = originalInput;
+    }
+
+    public DataInputStream getDecryptingInputStream() {
+      return decryptingInputStream;
+    }
+
+    public void setDecryptingInputStream(DataInputStream decryptingInputStream) {
+      this.decryptingInputStream = decryptingInputStream;
+    }
+  }
+  
+  
   public interface ServerResources {
     AccumuloConfiguration getConfiguration();
     
@@ -210,28 +242,83 @@ public class DfsLogger {
     this.logPath = filename;
   }
   
-  public static FSDataInputStream readHeader(VolumeManager fs, Path path, Map<String,String>
opts) throws IOException {
-    FSDataInputStream file = fs.open(path);
-    try {
-      byte[] magic = LOG_FILE_HEADER_V2.getBytes();
-      byte[] buffer = new byte[magic.length];
-      file.readFully(buffer);
-      if (Arrays.equals(buffer, magic)) {
-        int count = file.readInt();
+  public static DFSLoggerInputStreams readHeaderAndReturnStream(VolumeManager fs, Path path,
AccumuloConfiguration conf) throws IOException {
+    FSDataInputStream input = fs.open(path);
+    DataInputStream decryptingInput = null;
+    
+    byte[] magic = DfsLogger.LOG_FILE_HEADER_V3.getBytes();
+    byte[] magicBuffer = new byte[magic.length];
+    input.readFully(magicBuffer);
+    if (Arrays.equals(magicBuffer, magic)) {
+      // additional parameters it needs from the underlying stream.
+      String cryptoModuleClassname = input.readUTF();
+      org.apache.accumulo.core.security.crypto.CryptoModule cryptoModule = org.apache.accumulo.core.security.crypto.CryptoModuleFactory
+          .getCryptoModule(cryptoModuleClassname);
+
+      // Create the parameters and set the input stream into those parameters
+      CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf);
+      params.setEncryptedInputStream(input);
+
+      // Create the plaintext input stream from the encrypted one
+      params = cryptoModule.getDecryptingInputStream(params);
+
+      if (params.getPlaintextInputStream() instanceof DataInputStream) {
+        decryptingInput = (DataInputStream) params.getPlaintextInputStream();
+      } else {
+        decryptingInput = new DataInputStream(params.getPlaintextInputStream());
+      }
+    } else {
+      input.seek(0);
+      byte[] magicV2 = DfsLogger.LOG_FILE_HEADER_V2.getBytes();
+      byte[] magicBufferV2 = new byte[magic.length];
+      input.readFully(magicBufferV2);
+
+      if (Arrays.equals(magicBufferV2, magicV2)) {
+        // Log files from 1.5 dump their options in raw to the logger files.  Since we don't
know the class
+        // that needs to read those files, we can make a couple of basic assumptions.  Either
it's going to be
+        // the NullCryptoModule (no crypto) or the DefaultCryptoModule.
+        
+        // If it's null, we won't have any parameters whatsoever.  First, let's attempt to
read 
+        // parameters
+        Map<String,String> opts = new HashMap<String,String>();
+        int count = input.readInt();
         for (int i = 0; i < count; i++) {
-          String key = file.readUTF();
-          String value = file.readUTF();
+          String key = input.readUTF();
+          String value = input.readUTF();
           opts.put(key, value);
         }
+        
+        if (opts.size() == 0) {
+          // NullCryptoModule, we're done
+          decryptingInput = input;
+        } else {
+          
+          // The DefaultCryptoModule will want to read the parameters from the underlying
file, so we will put the file back to that spot.
+          org.apache.accumulo.core.security.crypto.CryptoModule cryptoModule = org.apache.accumulo.core.security.crypto.CryptoModuleFactory
+              .getCryptoModule(DefaultCryptoModule.class.getName());
+
+          CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf);
+          
+          input.seek(0);
+          input.readFully(magicBuffer);
+          params.setEncryptedInputStream(input);
+
+          params = cryptoModule.getDecryptingInputStream(params);
+          if (params.getPlaintextInputStream() instanceof DataInputStream) {
+            decryptingInput = (DataInputStream) params.getPlaintextInputStream();
+          } else {
+            decryptingInput = new DataInputStream(params.getPlaintextInputStream());
+          }
+        }
+        
       } else {
-        file.seek(0);
-        return file;
+
+        input.seek(0);
+        decryptingInput = input;
       }
-      return file;
-    } catch (IOException ex) {
-      file.seek(0);
-      return file;
+
     }
+    return new DFSLoggerInputStreams(input, decryptingInput);
   }
   
   public synchronized void open(String address) throws IOException {
@@ -278,7 +365,7 @@ public class DfsLogger {
           .getConfiguration().get(Property.CRYPTO_MODULE_CLASS));
       
       // Initialize the log file with a header and the crypto params used to set up this
log file.
-      logFile.write(LOG_FILE_HEADER_V2.getBytes());
+      logFile.write(LOG_FILE_HEADER_V3.getBytes());
 
       CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf.getConfiguration());
       
@@ -290,8 +377,6 @@ public class DfsLogger {
       logFile.writeUTF(conf.getConfiguration().get(Property.CRYPTO_MODULE_CLASS));
       
       
-      //@SuppressWarnings("deprecation")
-      //OutputStream encipheringOutputStream = cryptoModule.getEncryptingOutputStream(logFile,
cryptoOpts);
       params = cryptoModule.getEncryptingOutputStream(params);
       OutputStream encipheringOutputStream = params.getEncryptedOutputStream();
       

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8730d5ce/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
b/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
index abd9c6f..a630e5a 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
@@ -20,7 +20,6 @@ import java.io.DataInputStream;
 import java.io.EOFException;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -34,14 +33,13 @@ import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.master.thrift.RecoveryStatus;
-import org.apache.accumulo.core.security.crypto.CryptoModuleFactory;
-import org.apache.accumulo.core.security.crypto.CryptoModuleParameters;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.SimpleThreadPool;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.logger.LogFileKey;
 import org.apache.accumulo.server.logger.LogFileValue;
+import org.apache.accumulo.server.tabletserver.log.DfsLogger.DFSLoggerInputStreams;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue.Processor;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -112,46 +110,9 @@ public class LogSorter {
         // the following call does not throw an exception if the file/dir does not exist
         fs.deleteRecursively(new Path(destPath));
         
-        Map<String, String> opts = new HashMap<String,String>();
-        FSDataInputStream tmpInput = DfsLogger.readHeader(fs, srcPath, opts); 
-                
-        byte[] magic = DfsLogger.LOG_FILE_HEADER_V2.getBytes();
-        byte[] magicBuffer = new byte[magic.length];
-        tmpInput.readFully(magicBuffer);
-        if (!Arrays.equals(magicBuffer, magic)) {
-          tmpInput.seek(0);
-          synchronized (this) {
-           this.input = tmpInput;
-           this.decryptingInput = tmpInput;
-          }
-        } else {
-          // We read the crypto module class name here because we need to boot strap the
class.  The class itself will read any 
-          // additional parameters it needs from the underlying stream.
-          String cryptoModuleClassname = tmpInput.readUTF();
-          org.apache.accumulo.core.security.crypto.CryptoModule cryptoModule = org.apache.accumulo.core.security.crypto.CryptoModuleFactory
-              .getCryptoModule(cryptoModuleClassname);
-          
-          // Create the parameters and set the input stream into those parameters
-          CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf);
-          params.setEncryptedInputStream(tmpInput);
-          
-          // Create the plaintext input stream from the encrypted one
-          params = cryptoModule.getDecryptingInputStream(params);
-          
-          // Store the plaintext input stream into member variables
-          synchronized (this) {
-            this.input = tmpInput;
-            
-            if (params.getPlaintextInputStream() instanceof DataInputStream) {
-              this.decryptingInput = (DataInputStream)params.getPlaintextInputStream(); 
            
-            } else {
-              this.decryptingInput = new DataInputStream(params.getPlaintextInputStream());
-            }
-            
-          }
-          
-        }
-                
+        DFSLoggerInputStreams inputStreams = DfsLogger.readHeaderAndReturnStream(fs, srcPath,
conf);
+        this.input = inputStreams.getOriginalInput();
+        this.decryptingInput = inputStreams.getDecryptingInputStream();
         
         final long bufferSize = conf.getMemoryInBytes(Property.TSERV_SORT_BUFFER_SIZE);
         Thread.currentThread().setName("Sorting " + name + " for recovery");

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8730d5ce/server/src/test/java/org/apache/accumulo/server/tabletserver/log/TestUpgradePathForWALogs.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/accumulo/server/tabletserver/log/TestUpgradePathForWALogs.java
b/server/src/test/java/org/apache/accumulo/server/tabletserver/log/TestUpgradePathForWALogs.java
new file mode 100644
index 0000000..ef9439f
--- /dev/null
+++ b/server/src/test/java/org/apache/accumulo/server/tabletserver/log/TestUpgradePathForWALogs.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.server.tabletserver.log;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestUpgradePathForWALogs {
+
+  private static final String WALOG_FROM_15 = "/walog-from-15.walog";
+  private static final String WALOG_FROM_16 = "/walog-from-16.walog";
+
+  VolumeManager fs;
+
+  TemporaryFolder root;
+
+  @Before
+  public void setUp() throws Exception {
+    File tempFile = File.createTempFile("TestUpgradePathForWALogs", "");
+    String tempDirName = tempFile.getAbsolutePath() + "Dir";
+    tempFile.delete();
+
+    File tempDir = new File(tempDirName);
+    tempDir.mkdirs();
+
+    root = new TemporaryFolder(new File(tempDirName));
+
+    // quiet log messages about compress.CodecPool
+    Logger.getRootLogger().setLevel(Level.ERROR);
+    fs = VolumeManagerImpl.getLocal();
+    root.create();
+    String path = root.getRoot().getAbsolutePath();
+    Path manyMapsPath = new Path("file://" + path + "/manyMaps");
+    fs.mkdirs(manyMapsPath);
+    fs.create(new Path(manyMapsPath, "finished")).close();
+    // FileSystem ns = fs.getDefaultVolume();
+    // Writer writer = new Writer(ns.getConf(), ns, new Path(root, "odd").toString(), IntWritable.class,
BytesWritable.class);
+    // BytesWritable value = new BytesWritable("someValue".getBytes());
+    // for (int i = 1; i < 1000; i += 2) {
+    // writer.append(new IntWritable(i), value);
+    // }
+    // writer.close();
+    // writer = new Writer(ns.getConf(), ns, new Path(root, "even").toString(), IntWritable.class,
BytesWritable.class);
+    // for (int i = 0; i < 1000; i += 2) {
+    // if (i == 10)
+    // continue;
+    // writer.append(new IntWritable(i), value);
+    // }
+    // writer.close();
+  }
+
+  @Test
+  public void testUpgradeOf15WALog() throws IOException {
+    InputStream walogStream = null;
+    OutputStream walogInHDFStream = null;
+
+    try {
+
+      walogStream = getClass().getResourceAsStream(WALOG_FROM_15);
+      walogInHDFStream = new FileOutputStream(new File(root.getRoot().getAbsolutePath() +
WALOG_FROM_15));
+
+      IOUtils.copyLarge(walogStream, walogInHDFStream);
+      walogInHDFStream.flush();
+      walogInHDFStream.close();
+      walogInHDFStream = null;
+
+      @SuppressWarnings("deprecation")
+      LogSorter logSorter = new LogSorter(null, fs, DefaultConfiguration.getSiteConfiguration());
+      LogSorter.LogProcessor logProcessor = logSorter.new LogProcessor();
+
+      logProcessor.sort(WALOG_FROM_15, new Path("file://" + root.getRoot().getAbsolutePath()
+ WALOG_FROM_15), "file://" + root.getRoot().getAbsolutePath()
+          + "/manyMaps");
+
+    } finally {
+      if (walogStream != null) {
+        walogStream.close();
+      }
+
+      if (walogInHDFStream != null) {
+        walogInHDFStream.close();
+      }
+    }
+  }
+  
+  @Test
+  public void testBasic16WALogRead() throws IOException {
+    String walogToTest = WALOG_FROM_16;
+    
+    InputStream walogStream = null;
+    OutputStream walogInHDFStream = null;
+
+    try {
+
+      walogStream = getClass().getResourceAsStream(walogToTest);
+      walogInHDFStream = new FileOutputStream(new File(root.getRoot().getAbsolutePath() +
walogToTest));
+
+      IOUtils.copyLarge(walogStream, walogInHDFStream);
+      walogInHDFStream.flush();
+      walogInHDFStream.close();
+      walogInHDFStream = null;
+
+      @SuppressWarnings("deprecation")
+      LogSorter logSorter = new LogSorter(null, fs, DefaultConfiguration.getSiteConfiguration());
+      LogSorter.LogProcessor logProcessor = logSorter.new LogProcessor();
+
+      logProcessor.sort(walogToTest, new Path("file://" + root.getRoot().getAbsolutePath()
+ walogToTest), "file://" + root.getRoot().getAbsolutePath()
+          + "/manyMaps");
+
+    } finally {
+      if (walogStream != null) {
+        walogStream.close();
+      }
+
+      if (walogInHDFStream != null) {
+        walogInHDFStream.close();
+      }
+    }    
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    // root.delete();
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8730d5ce/server/src/test/resources/walog-from-15.walog
----------------------------------------------------------------------
diff --git a/server/src/test/resources/walog-from-15.walog b/server/src/test/resources/walog-from-15.walog
new file mode 100644
index 0000000..1922dea
Binary files /dev/null and b/server/src/test/resources/walog-from-15.walog differ

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8730d5ce/server/src/test/resources/walog-from-16.walog
----------------------------------------------------------------------
diff --git a/server/src/test/resources/walog-from-16.walog b/server/src/test/resources/walog-from-16.walog
new file mode 100644
index 0000000..4654f00
Binary files /dev/null and b/server/src/test/resources/walog-from-16.walog differ


Mime
View raw message