avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject svn commit: r1640179 - in /avro/trunk: ./ lang/java/avro/src/main/java/org/apache/avro/file/ lang/java/tools/src/main/java/org/apache/avro/tool/ lang/java/tools/src/test/java/org/apache/avro/tool/
Date Mon, 17 Nov 2014 16:04:36 GMT
Author: tomwhite
Date: Mon Nov 17 16:04:35 2014
New Revision: 1640179

URL: http://svn.apache.org/r1640179
Log:
AVRO-834. Java: Data File corruption recovery tool. Contributed by scottcarey and tomwhite.

Added:
    avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/DataFileRepairTool.java
    avro/trunk/lang/java/tools/src/test/java/org/apache/avro/tool/TestDataFileRepairTool.java
Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java
    avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/Main.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1640179&r1=1640178&r2=1640179&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Mon Nov 17 16:04:35 2014
@@ -29,6 +29,9 @@ Trunk (not yet released)
     AVRO-570. Python: Add connector for tethered mapreduce.
     (Jeremy Lewi and Steven Willis via cutting)    
 
+    AVRO-834. Java: Data File corruption recovery tool.
+    (scottcarey and tomwhite)
+
   OPTIMIZATIONS
 
   IMPROVEMENTS

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java?rev=1640179&r1=1640178&r2=1640179&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java Mon Nov
17 16:04:35 2014
@@ -251,6 +251,9 @@ public class DataFileStream<D> implement
   /** Expert: Return the count of items in the current block. */
   public long getBlockCount() { return blockCount; }
 
+  /** Expert: Return the size in bytes (uncompressed) of the current block. */
+  public long getBlockSize() { return blockSize; }
+
   protected void blockFinished() throws IOException {
     // nothing for the stream impl
   }

Added: avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/DataFileRepairTool.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/DataFileRepairTool.java?rev=1640179&view=auto
==============================================================================
--- avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/DataFileRepairTool.java
(added)
+++ avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/DataFileRepairTool.java
Mon Nov 17 16:04:35 2014
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.tool;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileConstants;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+
+/** Recovers data from a corrupt Avro Data file */
+public class DataFileRepairTool implements Tool {
+
+  @Override
+  public String getName() {
+    return "repair";
+  }
+
+  @Override
+  public String getShortDescription() {
+    return "Recovers data from a corrupt Avro Data file";
+  }
+  
+  private void printInfo(PrintStream output) {
+    output.println("Insufficient arguments.  Arguments:  [-o option] "
+        + "input_file output_file \n"
+        + "   Where option is one of the following: \n"
+        + "      " + ALL
+        + " (default) recover as many records as possible.\n"
+        + "      " + PRIOR
+        + "         recover only records prior to the first instance"
+        + " of corruption \n"
+        + "      " + AFTER
+        + "         recover only records after the first instance of"
+        + " corruption.\n"
+        + "      " + REPORT
+        + "        print the corruption report only, reporting the\n"
+        + "                    number of valid and corrupted blocks and records\n"
+        + "   input_file is the file to read from.  output_file is the file to\n"
+        + "   create and write recovered data to.  output_file is ignored if\n"
+        + "   using the report option.");
+  }
+
+  private static final Set<String> OPTIONS = new HashSet<String>();
+  private static final String ALL = "all";
+  private static final String PRIOR = "prior";
+  private static final String AFTER = "after";
+  private static final String REPORT = "report";
+  static {
+    OPTIONS.add(ALL);
+    OPTIONS.add(PRIOR);
+    OPTIONS.add(AFTER);
+    OPTIONS.add(REPORT);
+  }
+
+  @Override
+  public int run(InputStream stdin, PrintStream out, PrintStream err,
+      List<String> args) throws Exception {
+    if (args.size() < 2) {
+      printInfo(err);
+      return 1;
+    }
+    int index = 0;
+    String input = args.get(index);
+    String option = "all";
+    if ("-o".equals(input)) {
+      option = args.get(1);
+      index += 2;
+    }
+    if (!OPTIONS.contains(option) || (args.size() - index < 1)) {
+      printInfo(err);
+      return 1;
+    }
+    input = args.get(index++);
+    if (!REPORT.equals(option)) {
+      if (args.size() - index < 1) {
+        printInfo(err);
+        return 1;
+      } 
+    }
+    if (ALL.equals(option)) {
+      return recoverAll(input, args.get(index), out, err);
+    } else if (PRIOR.equals(option)) {
+      return recoverPrior(input, args.get(index), out, err);
+    } else if (AFTER.equals(option)) {
+      return recoverAfter(input, args.get(index), out, err);
+    } else if (REPORT.equals(option)) {
+      return reportOnly(input, out, err);
+    } else {
+      return 1;
+    }
+  }
+
+  private int recover(String input, String output, PrintStream out,
+      PrintStream err, boolean recoverPrior, boolean recoverAfter)
+      throws IOException {
+    File infile = new File(input);
+    if (!infile.canRead()) {
+      err.println("cannot read file: " + input);
+      return 1;
+    }
+    out.println("Recovering file: " + input);
+    GenericDatumReader<Object> reader = new GenericDatumReader<Object>();
+    DataFileReader<Object> fileReader = new DataFileReader<Object>(infile,
+        reader);
+    try {
+      Schema schema = fileReader.getSchema();
+      String codecStr = fileReader.getMetaString(DataFileConstants.CODEC);
+      CodecFactory codecFactory = CodecFactory.fromString("" + codecStr);
+      List<String> metas = fileReader.getMetaKeys();
+      if (recoverPrior || recoverAfter) {
+        GenericDatumWriter<Object> writer = new GenericDatumWriter<Object>();
+        DataFileWriter<Object> fileWriter = new DataFileWriter<Object>(writer);
+        try {
+          File outfile = new File(output);
+          for (String key : metas) {
+            if (!key.startsWith("avro.")) {
+              byte[] val = fileReader.getMeta(key);
+              fileWriter.setMeta(key, val);
+            }
+          }
+          fileWriter.setCodec(codecFactory);
+          int result = innerRecover(fileReader, fileWriter, out, err, recoverPrior,
+              recoverAfter, schema, outfile);
+          return result;
+        } catch (Exception e) {
+          e.printStackTrace(err);
+          return 1;
+        } 
+      } else {
+        return innerRecover(fileReader, null, out, err, recoverPrior,
+            recoverAfter, null, null);
+      }
+
+    } finally {
+      fileReader.close();
+    }
+  }
+
+  private int innerRecover(DataFileReader<Object> fileReader,
+      DataFileWriter<Object> fileWriter, PrintStream out, PrintStream err,
+      boolean recoverPrior, boolean recoverAfter, Schema schema, File outfile) {
+    int numBlocks = 0;
+    int numCorruptBlocks = 0;
+    int numRecords = 0;
+    int numCorruptRecords = 0;
+    int recordsWritten = 0;
+    long position = fileReader.previousSync();
+    long blockSize = 0;
+    long blockCount = 0;
+    boolean fileWritten = false;
+    try {
+      while (true) {
+        try {
+          if (!fileReader.hasNext()) {
+            out.println("File Summary: ");
+            out.println("  Number of blocks: " + numBlocks
+                + " Number of corrupt blocks: " + numCorruptBlocks);
+            out.println("  Number of records: " + numRecords
+                + " Number of corrupt records: " + numCorruptRecords);
+            if (recoverAfter || recoverPrior) {
+              out.println("  Number of records written " + recordsWritten);
+            }
+            out.println();
+            return 0;
+          }
+          position = fileReader.previousSync();
+          blockCount = fileReader.getBlockCount();
+          blockSize = fileReader.getBlockSize();
+          numRecords += blockCount;
+          long blockRemaining = blockCount;
+          numBlocks++;
+          boolean lastRecordWasBad = false;
+          long badRecordsInBlock = 0;
+          while (blockRemaining > 0) {
+            try {
+              Object datum = fileReader.next();
+              if ((recoverPrior && numCorruptBlocks == 0)
+                  || (recoverAfter && numCorruptBlocks > 0)) {
+                if (!fileWritten) {
+                  try {
+                    fileWriter.create(schema, outfile);
+                    fileWritten = true;
+                  } catch (Exception e) {
+                    e.printStackTrace(err);
+                    return 1;
+                  }
+                }
+                try {
+                  fileWriter.append(datum);
+                  recordsWritten++;
+                } catch (Exception e) {
+                  e.printStackTrace(err);
+                  throw e;
+                }
+              }
+              blockRemaining--;
+              lastRecordWasBad = false;
+            } catch (Exception e) {
+              long pos = blockCount - blockRemaining;
+              if (badRecordsInBlock == 0) {
+                // first corrupt record
+                numCorruptBlocks++;
+                err.println("Corrupt block: " + numBlocks + " Records in block: "
+                    + blockCount + " uncompressed block size: " + blockSize);
+                err.println("Corrupt record at position: "
+                    + (pos));
+              } else {
+                // second bad record in block, if consecutive skip block.
+                err.println("Corrupt record at position: "
+                    + (pos));
+                if (lastRecordWasBad) {
+                  // consecutive bad record
+                  err.println("Second consecutive bad record in block: " + numBlocks 
+                      + ". Skipping remainder of block. ");
+                  numCorruptRecords += blockRemaining;
+                  badRecordsInBlock += blockRemaining;
+                  try {
+                    fileReader.sync(position);
+                  } catch (Exception e2) {
+                    err.println("failed to sync to sync marker, aborting");
+                    e2.printStackTrace(err);
+                    return 1;
+                  }
+                  break;
+                }
+              }
+              blockRemaining --;
+              lastRecordWasBad = true;
+              numCorruptRecords++;
+              badRecordsInBlock++;
+            }
+          }
+          if (badRecordsInBlock != 0) {
+            err.println("** Number of unrecoverable records in block: "
+                + (badRecordsInBlock));
+          }
+          position = fileReader.previousSync();
+        } catch (Exception e) {
+          err.println("Failed to read block " + numBlocks + ". Unknown record "
+              + "count in block.  Skipping. Reason: " + e.getMessage());
+          numCorruptBlocks++;
+          try {
+            fileReader.sync(position);
+          } catch (Exception e2) {
+            err.println("failed to sync to sync marker, aborting");
+            e2.printStackTrace(err);
+            return 1;
+          }
+        } 
+      }
+    } finally {
+      if (fileWritten) {
+        try {
+          fileWriter.close();
+        } catch (Exception e) {
+          e.printStackTrace(err);
+          return 1;
+        }
+      }
+    }
+  }
+
+  private int reportOnly(String input, PrintStream out, PrintStream err)
+      throws IOException {
+    return recover(input, null, out, err, false, false);
+  }
+
+  private int recoverAfter(String input, String output, PrintStream out,
+      PrintStream err) throws IOException {
+    return recover(input, output, out, err, false, true);
+  }
+
+  private int recoverPrior(String input, String output, PrintStream out,
+      PrintStream err) throws IOException {
+    return recover(input, output, out, err, true, false);
+  }
+
+  private int recoverAll(String input, String output, PrintStream out,
+      PrintStream err) throws IOException {
+    return recover(input, output, out, err, true, true);
+  }
+}

Modified: avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/Main.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/Main.java?rev=1640179&r1=1640178&r2=1640179&view=diff
==============================================================================
--- avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/Main.java (original)
+++ avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/Main.java Mon Nov 17 16:04:35
2014
@@ -45,6 +45,7 @@ public class Main {
         new DataFileWriteTool(),
         new DataFileGetMetaTool(),
         new DataFileGetSchemaTool(),
+        new DataFileRepairTool(),
         new IdlTool(),
         new IdlToSchemataTool(),
         new RecodecTool(),

Added: avro/trunk/lang/java/tools/src/test/java/org/apache/avro/tool/TestDataFileRepairTool.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/tools/src/test/java/org/apache/avro/tool/TestDataFileRepairTool.java?rev=1640179&view=auto
==============================================================================
--- avro/trunk/lang/java/tools/src/test/java/org/apache/avro/tool/TestDataFileRepairTool.java
(added)
+++ avro/trunk/lang/java/tools/src/test/java/org/apache/avro/tool/TestDataFileRepairTool.java
Mon Nov 17 16:04:35 2014
@@ -0,0 +1,190 @@
+package org.apache.avro.tool;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.util.Arrays;
+import org.apache.avro.AvroTestUtil;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileConstants;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.BinaryData;
+import org.apache.avro.util.Utf8;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestDataFileRepairTool {
+
+  private static final Schema SCHEMA = Schema.create(Schema.Type.STRING);
+  private static File corruptBlockFile;
+  private static File corruptRecordFile;
+
+  private File repairedFile;
+
+  @BeforeClass
+  public static void writeCorruptFile() throws IOException {
+    // Write a data file
+    DataFileWriter<Utf8> w = new DataFileWriter<Utf8>(new GenericDatumWriter<Utf8>(SCHEMA));
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    w.create(SCHEMA, baos);
+    w.append(new Utf8("apple"));
+    w.append(new Utf8("banana"));
+    w.append(new Utf8("celery"));
+    w.sync();
+    w.append(new Utf8("date"));
+    w.append(new Utf8("endive"));
+    w.append(new Utf8("fig"));
+    long pos = w.sync();
+    w.append(new Utf8("guava"));
+    w.append(new Utf8("hazelnut"));
+    w.close();
+
+    byte[] original = baos.toByteArray();
+
+    // Corrupt the second block by inserting some zero bytes before the sync marker
+    int corruptPosition = (int) pos - DataFileConstants.SYNC_SIZE;
+    int corruptedBytes = 3;
+    byte[] corrupted = new byte[original.length + corruptedBytes];
+    System.arraycopy(original, 0, corrupted, 0, corruptPosition);
+    System.arraycopy(original, corruptPosition,
+        corrupted, corruptPosition + corruptedBytes, original.length - corruptPosition);
+
+    corruptBlockFile = AvroTestUtil.tempFile(TestDataFileRepairTool.class,
+        "corruptBlock.avro");
+    corruptBlockFile.deleteOnExit();
+    FileOutputStream out = new FileOutputStream(corruptBlockFile);
+    out.write(corrupted);
+    out.close();
+
+    // Corrupt the "endive" record by changing the length of the string to be negative
+    corruptPosition = (int) pos - DataFileConstants.SYNC_SIZE -
+        (1 + "fig".length() + 1 + "endive".length());
+    corrupted = new byte[original.length];
+    System.arraycopy(original, 0, corrupted, 0, original.length);
+    BinaryData.encodeLong(-1, corrupted, corruptPosition);
+
+    corruptRecordFile = AvroTestUtil.tempFile(TestDataFileRepairTool.class,
+        "corruptRecord.avro");
+    corruptRecordFile.deleteOnExit();
+    out = new FileOutputStream(corruptRecordFile);
+    out.write(corrupted);
+    out.close();
+  }
+
+  @Before
+  public void setUp() {
+    repairedFile = AvroTestUtil.tempFile(TestDataFileRepairTool.class, "repaired.avro");
+  }
+
+  @After
+  public void tearDown() {
+    repairedFile.delete();
+  }
+
+  private String run(Tool tool, String... args) throws Exception {
+    return run(tool, null, args);
+  }
+
+  private String run(Tool tool, InputStream stdin, String... args) throws Exception {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    PrintStream stdout = new PrintStream(out);
+    tool.run(
+        stdin,
+        stdout,
+        System.err,
+        Arrays.asList(args));
+    return out.toString("UTF-8").replace("\r", "");
+  }
+
+  @Test
+  public void testReportCorruptBlock() throws Exception {
+    String output = run(new DataFileRepairTool(), "-o", "report", corruptBlockFile.getPath());
+    assertTrue(output, output.contains("Number of blocks: 2 Number of corrupt blocks: 1"));
+    assertTrue(output, output.contains("Number of records: 5 Number of corrupt records: 0"));
+  }
+
+  @Test
+  public void testReportCorruptRecord() throws Exception {
+    String output = run(new DataFileRepairTool(), "-o", "report", corruptRecordFile.getPath());
+    assertTrue(output, output.contains("Number of blocks: 3 Number of corrupt blocks: 1"));
+    assertTrue(output, output.contains("Number of records: 8 Number of corrupt records: 2"));
+  }
+
+  @Test
+  public void testRepairAllCorruptBlock() throws Exception {
+    String output = run(new DataFileRepairTool(), "-o", "all",
+        corruptBlockFile.getPath(), repairedFile.getPath());
+    assertTrue(output, output.contains("Number of blocks: 2 Number of corrupt blocks: 1"));
+    assertTrue(output, output.contains("Number of records: 5 Number of corrupt records: 0"));
+    checkFileContains(repairedFile, "apple", "banana", "celery", "guava", "hazelnut");
+  }
+
+  @Test
+  public void testRepairAllCorruptRecord() throws Exception {
+    String output = run(new DataFileRepairTool(), "-o", "all",
+        corruptRecordFile.getPath(), repairedFile.getPath());
+    assertTrue(output, output.contains("Number of blocks: 3 Number of corrupt blocks: 1"));
+    assertTrue(output, output.contains("Number of records: 8 Number of corrupt records: 2"));
+    checkFileContains(repairedFile, "apple", "banana", "celery", "date", "guava",
+        "hazelnut");
+  }
+
+  @Test
+  public void testRepairPriorCorruptBlock() throws Exception {
+    String output = run(new DataFileRepairTool(), "-o", "prior",
+        corruptBlockFile.getPath(), repairedFile.getPath());
+    assertTrue(output, output.contains("Number of blocks: 2 Number of corrupt blocks: 1"));
+    assertTrue(output, output.contains("Number of records: 5 Number of corrupt records: 0"));
+    checkFileContains(repairedFile, "apple", "banana", "celery");
+  }
+
+  @Test
+  public void testRepairPriorCorruptRecord() throws Exception {
+    String output = run(new DataFileRepairTool(), "-o", "prior",
+        corruptRecordFile.getPath(), repairedFile.getPath());
+    assertTrue(output, output.contains("Number of blocks: 3 Number of corrupt blocks: 1"));
+    assertTrue(output, output.contains("Number of records: 8 Number of corrupt records: 2"));
+    checkFileContains(repairedFile, "apple", "banana", "celery", "date");
+  }
+
+  @Test
+  public void testRepairAfterCorruptBlock() throws Exception {
+    String output = run(new DataFileRepairTool(), "-o", "after",
+        corruptBlockFile.getPath(), repairedFile.getPath());
+    assertTrue(output, output.contains("Number of blocks: 2 Number of corrupt blocks: 1"));
+    assertTrue(output, output.contains("Number of records: 5 Number of corrupt records: 0"));
+    checkFileContains(repairedFile, "guava", "hazelnut");
+  }
+
+  @Test
+  public void testRepairAfterCorruptRecord() throws Exception {
+    String output = run(new DataFileRepairTool(), "-o", "after",
+        corruptRecordFile.getPath(), repairedFile.getPath());
+    assertTrue(output, output.contains("Number of blocks: 3 Number of corrupt blocks: 1"));
+    assertTrue(output, output.contains("Number of records: 8 Number of corrupt records: 2"));
+    checkFileContains(repairedFile, "guava", "hazelnut");
+  }
+
+  private void checkFileContains(File repairedFile, String... lines) throws IOException {
+    DataFileReader r = new DataFileReader<Utf8>(repairedFile,
+        new GenericDatumReader<Utf8>(SCHEMA));
+    for (String line : lines) {
+      assertEquals(line, r.next().toString());
+    }
+    assertFalse(r.hasNext());
+  }
+
+}



Mime
View raw message