hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From haiboc...@apache.org
Subject [27/50] [abbrv] hadoop git commit: MapFile.fix creates a wrong index file in case of block-compressed data file. Contributed by Grigori Rybkine
Date Mon, 29 Jan 2018 05:10:45 GMT
MapFile.fix creates a wrong index file in case of block-compressed data file. Contributed by
Grigori Rybkine


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/56872cff
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/56872cff
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/56872cff

Branch: refs/heads/YARN-1011
Commit: 56872cff92f543bf77206a1324968559dceb7bc2
Parents: 8b5b045
Author: Chris Douglas <cdouglas@apache.org>
Authored: Fri Jan 26 09:06:48 2018 -0800
Committer: Chris Douglas <cdouglas@apache.org>
Committed: Fri Jan 26 09:18:30 2018 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/hadoop/io/MapFile.java | 35 ++++++++++--
 .../java/org/apache/hadoop/io/TestMapFile.java  | 59 +++++++++++++++++++-
 2 files changed, 88 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/56872cff/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java
index d56822f..51db0b3 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java
@@ -811,15 +811,40 @@ public class MapFile {
                                     (LongWritable.class));
     }
     try {
-      long pos = 0L;
+      /** What's the position (in bytes) we wrote when we got the last index */
+      long lastIndexPos = -1;
+      /**
+       * What was size when we last wrote an index. Set to MIN_VALUE to ensure
+       * that we have an index at position zero - midKey will throw an exception
+       * if this is not the case
+       */
+      long lastIndexKeyCount = Long.MIN_VALUE;
+      long pos = dataReader.getPosition();
       LongWritable position = new LongWritable();
+      long nextBlock = pos;
+      boolean blockCompressed = dataReader.isBlockCompressed();
       while(dataReader.next(key, value)) {
-        cnt++;
-        if (cnt % indexInterval == 0) {
+        if (blockCompressed) {
+          long curPos = dataReader.getPosition();
+          if (curPos > nextBlock) {
+            pos = nextBlock;                       // current block position
+            nextBlock = curPos;
+          }
+        }
+        // Follow the same logic as in
+        // {@link MapFile.Writer#append(WritableComparable, Writable)}
+        if (cnt >= lastIndexKeyCount + indexInterval && pos > lastIndexPos)
{
           position.set(pos);
-          if (!dryrun) indexWriter.append(key, position);
+          if (!dryrun) {
+            indexWriter.append(key, position);
+          }
+          lastIndexPos = pos;
+          lastIndexKeyCount = cnt;
+        }
+        if (!blockCompressed) {
+          pos = dataReader.getPosition();         // next record position
         }
-        pos = dataReader.getPosition();
+        cnt++;
       }
     } catch(Throwable t) {
       // truncated data file. swallow it.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/56872cff/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java
index ff8df7c..7ec4227 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java
@@ -485,6 +485,63 @@ public class TestMapFile {
       IOUtils.cleanup(null, writer);
     }
   }
+
+  /**
+   * test {@link MapFile#fix(FileSystem, Path, Class<? extends Writable>,
+   *                         Class<? extends Writable>, boolean, Configuration)}
+   * method in case of BLOCK compression
+   */
+  @Test
+  public void testFixBlockCompress() throws Exception {
+    final String indexLessMapFile = "testFixBlockCompress.mapfile";
+    final int compressBlocksize = 100;
+    final int indexInterval = 4;
+    final int noBlocks = 4;
+    final String value = "value-";
+    final int size = noBlocks * compressBlocksize / (4 + value.length());
+
+    conf.setInt("io.seqfile.compress.blocksize", compressBlocksize);
+    MapFile.Writer.setIndexInterval(conf, indexInterval);
+    FileSystem fs = FileSystem.getLocal(conf);
+    Path dir = new Path(TEST_DIR, indexLessMapFile);
+    MapFile.Writer writer = null;
+    MapFile.Reader reader = null;
+    try {
+      writer =
+          new MapFile.Writer(conf, dir,
+          MapFile.Writer.keyClass(IntWritable.class),
+          MapFile.Writer.valueClass(Text.class),
+          MapFile.Writer.compression(CompressionType.BLOCK));
+      for (int i = 0; i < size; i++) {
+        writer.append(new IntWritable(i), new Text(value + i));
+      }
+      writer.close();
+      Path index = new Path(dir, MapFile.INDEX_FILE_NAME);
+      fs.rename(index, index.suffix(".orig"));
+
+      assertEquals("No of valid MapFile entries wrong", size,
+                   MapFile.fix(fs, dir, IntWritable.class, Text.class,
+                               false, conf));
+      reader = new MapFile.Reader(dir, conf);
+      IntWritable key;
+      Text val = new Text();
+      int notFound = 0;
+      for (int i = 0; i < size; i++) {
+        key = new IntWritable(i);
+        if (null == reader.get(key, val)) {
+          notFound++;
+        }
+      }
+      assertEquals("With MapFile.fix-ed index, could not get entries # ",
+                   0, notFound);
+    } finally {
+      IOUtils.cleanupWithLogger(null, writer, reader);
+      if (fs.exists(dir)) {
+        fs.delete(dir, true);
+      }
+    }
+  }
+
   /**
    * test all available constructor for {@code MapFile.Writer}
    */
@@ -619,7 +676,7 @@ public class TestMapFile {
     } catch (Exception ex) {
       fail("testMainMethodMapFile error !!!");
     } finally {
-      IOUtils.cleanup(null, writer);
+      IOUtils.cleanupWithLogger(null, writer);
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message